refactor(proactive):将主动性消息整合到统一调度框架中

- 将主动思考能力直接整合进亲和力交流流程中
- 以可配置的动态调度系统替换传统间隔系统
- 通过白名单/黑名单过滤实现细粒度控制
- 增加基于时间的频率调制和冷却周期管理功能
- 移除独立的proactive_thinker插件,采用集成化方案替代
- 更新配置架构,增加增强型主动消息参数
This commit is contained in:
Windpicker-owo
2025-10-31 12:27:01 +08:00
parent 4dbc8b5d15
commit 246a15daae
14 changed files with 1336 additions and 963 deletions

View File

@@ -23,6 +23,21 @@ async def send_message(message: MessageSending, show_log=True) -> bool:
await get_global_api().send_message(message)
if show_log:
logger.info(f"已将消息 '{message_preview}' 发往平台'{message.message_info.platform}'")
# 触发 AFTER_SEND 事件
try:
from src.plugin_system.core.event_manager import event_manager
from src.plugin_system.base.component_types import EventType
if message.chat_stream:
await event_manager.trigger_event(
EventType.AFTER_SEND,
kwargs={"stream_id": message.chat_stream.stream_id, "message": message},
trigger_source="SYSTEM",
)
except Exception as event_error:
logger.error(f"触发 AFTER_SEND 事件时出错: {event_error}")
return True
except Exception as e:

View File

@@ -757,7 +757,62 @@ class ProactiveThinkingConfig(ValidatedConfigBase):
)
# --- 冷启动配置 (针对私聊) ---
enable_cold_start: bool = Field(default=True, description="对于白名单中不活跃的私聊,是否允许进行一次“冷启动”问候")
enable_cold_start: bool = Field(default=True, description='对于白名单中不活跃的私聊,是否允许进行一次"冷启动"问候')
cold_start_cooldown: int = Field(
default=86400, description="冷启动后,该私聊的下一次主动思考需要等待的最小时间(秒)"
)
# --- 新增:间隔配置 ---
base_interval: int = Field(default=1800, ge=60, description="基础触发间隔默认30分钟")
min_interval: int = Field(default=600, ge=60, description="最小触发间隔默认10分钟。兴趣分数高时会接近此值")
max_interval: int = Field(default=7200, ge=60, description="最大触发间隔默认2小时。兴趣分数低时会接近此值")
# --- 新增:动态调整配置 ---
use_interest_score: bool = Field(default=True, description="是否根据兴趣分数动态调整间隔。关闭则使用固定base_interval")
interest_score_factor: float = Field(default=2.0, ge=1.0, le=3.0, description="兴趣分数影响因子。公式: interval = base * (factor - score)")
# --- 新增:黑白名单配置 ---
whitelist_mode: bool = Field(default=False, description="是否启用白名单模式。启用后只对白名单中的聊天流生效")
blacklist_mode: bool = Field(default=False, description="是否启用黑名单模式。启用后排除黑名单中的聊天流")
whitelist_private: list[str] = Field(
default_factory=list,
description='私聊白名单,格式: ["platform:user_id:private", "qq:12345:private"]'
)
whitelist_group: list[str] = Field(
default_factory=list,
description='群聊白名单,格式: ["platform:group_id:group", "qq:123456:group"]'
)
blacklist_private: list[str] = Field(
default_factory=list,
description='私聊黑名单,格式: ["platform:user_id:private", "qq:12345:private"]'
)
blacklist_group: list[str] = Field(
default_factory=list,
description='群聊黑名单,格式: ["platform:group_id:group", "qq:123456:group"]'
)
# --- 新增:兴趣分数阈值 ---
min_interest_score: float = Field(default=0.0, ge=0.0, le=1.0, description="最低兴趣分数阈值,低于此值不会主动思考")
max_interest_score: float = Field(default=1.0, ge=0.0, le=1.0, description="最高兴趣分数阈值,高于此值不会主动思考(用于限制过度活跃)")
# --- 新增:时间策略配置 ---
enable_time_strategy: bool = Field(default=False, description="是否启用时间策略(根据时段调整频率)")
quiet_hours_start: str = Field(default="00:00", description='安静时段开始时间,格式: "HH:MM"')
quiet_hours_end: str = Field(default="07:00", description='安静时段结束时间,格式: "HH:MM"')
active_hours_multiplier: float = Field(default=0.7, ge=0.1, le=2.0, description="活跃时段间隔倍数,<1表示更频繁>1表示更稀疏")
# --- 新增:冷却与限制 ---
reply_reset_enabled: bool = Field(default=True, description="bot回复后是否重置定时器避免回复后立即又主动发言")
topic_throw_cooldown: int = Field(default=3600, ge=0, description="抛出话题后的冷却时间(秒),期间暂停主动思考")
max_daily_proactive: int = Field(default=0, ge=0, description="每个聊天流每天最多主动发言次数0表示不限制")
# --- 新增:决策权重配置 ---
do_nothing_weight: float = Field(default=0.4, ge=0.0, le=1.0, description="do_nothing动作的基础权重")
simple_bubble_weight: float = Field(default=0.3, ge=0.0, le=1.0, description="simple_bubble动作的基础权重")
throw_topic_weight: float = Field(default=0.3, ge=0.0, le=1.0, description="throw_topic动作的基础权重")
# --- 新增:调试与监控 ---
enable_statistics: bool = Field(default=True, description="是否启用统计功能(记录触发次数、决策分布等)")
log_decisions: bool = Field(default=False, description="是否记录每次决策的详细日志(用于调试)")

View File

@@ -422,6 +422,14 @@ MoFox_Bot(第三方修改版)
except Exception as e:
logger.error(f"注册API路由失败: {e}")
# 初始化统一调度器
try:
from src.schedule.unified_scheduler import initialize_scheduler
await initialize_scheduler()
except Exception as e:
logger.error(f"统一调度器初始化失败: {e}")
# 加载所有插件
plugin_manager.load_all_plugins()
@@ -486,14 +494,6 @@ MoFox_Bot(第三方修改版)
# 初始化计划相关组件
await self._init_planning_components()
# 初始化统一调度器
try:
from src.schedule.unified_scheduler import initialize_scheduler
await initialize_scheduler()
except Exception as e:
logger.error(f"统一调度器初始化失败: {e}")
# 触发启动事件
try:
await event_manager.trigger_event(EventType.ON_START, permission_group="SYSTEM")

View File

@@ -68,4 +68,20 @@ class AffinityChatterPlugin(BasePlugin):
except Exception as e:
logger.error(f"加载 ChatStreamImpressionTool 时出错: {e}")
try:
# 延迟导入 ProactiveThinkingReplyHandler
from .proactive_thinking_event import ProactiveThinkingReplyHandler
components.append((ProactiveThinkingReplyHandler.get_handler_info(), ProactiveThinkingReplyHandler))
except Exception as e:
logger.error(f"加载 ProactiveThinkingReplyHandler 时出错: {e}")
try:
# 延迟导入 ProactiveThinkingMessageHandler
from .proactive_thinking_event import ProactiveThinkingMessageHandler
components.append((ProactiveThinkingMessageHandler.get_handler_info(), ProactiveThinkingMessageHandler))
except Exception as e:
logger.error(f"加载 ProactiveThinkingMessageHandler 时出错: {e}")
return components

View File

@@ -0,0 +1,146 @@
"""
主动思考事件处理器
监听bot的reply事件在reply后重置对应聊天流的主动思考定时任务
"""
from src.common.logger import get_logger
from src.plugin_system import BaseEventHandler, EventType
from src.plugin_system.base.base_event import HandlerResult
from src.plugins.built_in.affinity_flow_chatter.proactive_thinking_scheduler import (
proactive_thinking_scheduler,
)
logger = get_logger("proactive_thinking_event")
class ProactiveThinkingReplyHandler(BaseEventHandler):
"""Reply事件处理器
当bot回复某个聊天流后
1. 如果该聊天流的主动思考被暂停(因为抛出了话题),则恢复它
2. 无论是否暂停,都重置定时任务,重新开始计时
"""
handler_name: str = "proactive_thinking_reply_handler"
handler_description: str = "监听reply事件重置主动思考定时任务"
init_subscribe: list[EventType | str] = [EventType.AFTER_SEND]
async def execute(self, kwargs: dict | None) -> HandlerResult:
"""处理reply事件
Args:
kwargs: 事件参数,应包含 stream_id
Returns:
HandlerResult: 处理结果
"""
if not kwargs:
return HandlerResult(success=True, continue_process=True, message=None)
stream_id = kwargs.get("stream_id")
if not stream_id:
logger.warning("Reply事件缺少stream_id参数")
return HandlerResult(success=True, continue_process=True, message=None)
try:
from src.config.config import global_config
# 检查是否启用reply重置
if not global_config.proactive_thinking.reply_reset_enabled:
return HandlerResult(success=True, continue_process=True, message=None)
# 检查是否被暂停
was_paused = await proactive_thinking_scheduler.is_paused(stream_id)
if was_paused:
logger.info(f"检测到reply事件聊天流 {stream_id} 之前因抛出话题而暂停,现在恢复")
# 重置定时任务(这会自动清除暂停标记并创建新任务)
success = await proactive_thinking_scheduler.schedule_proactive_thinking(stream_id)
if success:
if was_paused:
logger.info(f"聊天流 {stream_id} 的主动思考已恢复并重置")
else:
logger.debug(f"聊天流 {stream_id} 的主动思考定时任务已重置")
else:
logger.warning(f"重置聊天流 {stream_id} 的主动思考任务失败")
except Exception as e:
logger.error(f"处理reply事件时出错: {e}", exc_info=True)
# 总是继续处理其他handler
return HandlerResult(success=True, continue_process=True, message=None)
class ProactiveThinkingMessageHandler(BaseEventHandler):
"""消息事件处理器
当收到消息时,如果该聊天流还没有主动思考任务,则创建一个
这样可以确保新的聊天流也能获得主动思考功能
"""
handler_name: str = "proactive_thinking_message_handler"
handler_description: str = "监听消息事件,为新聊天流创建主动思考任务"
init_subscribe: list[EventType | str] = [EventType.ON_MESSAGE]
async def execute(self, kwargs: dict | None) -> HandlerResult:
"""处理消息事件
Args:
kwargs: 事件参数,格式为 {"message": MessageRecv}
Returns:
HandlerResult: 处理结果
"""
if not kwargs:
return HandlerResult(success=True, continue_process=True, message=None)
# 从 kwargs 中获取 MessageRecv 对象
message = kwargs.get("message")
if not message or not hasattr(message, "chat_stream"):
return HandlerResult(success=True, continue_process=True, message=None)
# 从 chat_stream 获取 stream_id
chat_stream = message.chat_stream
if not chat_stream or not hasattr(chat_stream, "stream_id"):
return HandlerResult(success=True, continue_process=True, message=None)
stream_id = chat_stream.stream_id
try:
from src.config.config import global_config
# 检查是否启用主动思考
if not global_config.proactive_thinking.enable:
return HandlerResult(success=True, continue_process=True, message=None)
# 检查该聊天流是否已经有任务
task_info = await proactive_thinking_scheduler.get_task_info(stream_id)
if task_info:
# 已经有任务,不需要创建
return HandlerResult(success=True, continue_process=True, message=None)
# 从 message_info 获取平台和聊天ID信息
message_info = message.message_info
platform = message_info.platform
is_group = message_info.group_info is not None
chat_id = message_info.group_info.group_id if is_group else message_info.user_info.user_id # type: ignore
# 构造配置字符串
stream_config = f"{platform}:{chat_id}:{'group' if is_group else 'private'}"
# 检查黑白名单
if not proactive_thinking_scheduler._check_whitelist_blacklist(stream_config):
return HandlerResult(success=True, continue_process=True, message=None)
# 创建主动思考任务
success = await proactive_thinking_scheduler.schedule_proactive_thinking(stream_id)
if success:
logger.info(f"为新聊天流 {stream_id} 创建了主动思考任务")
except Exception as e:
logger.error(f"处理消息事件时出错: {e}", exc_info=True)
# 总是继续处理其他handler
return HandlerResult(success=True, continue_process=True, message=None)

View File

@@ -0,0 +1,538 @@
"""
主动思考执行器
当定时任务触发时负责搜集信息、调用LLM决策、并根据决策生成回复
"""
import json
from datetime import datetime
from typing import Any, Literal, Optional
from sqlalchemy import select
from src.chat.express.expression_learner import expression_learner_manager
from src.chat.express.expression_selector import expression_selector
from src.common.database.sqlalchemy_database_api import get_db_session
from src.common.database.sqlalchemy_models import ChatStreams
from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.individuality.individuality import Individuality
from src.llm_models.utils_model import LLMRequest
from src.plugin_system.apis import chat_api, message_api, send_api
logger = get_logger("proactive_thinking_executor")
class ProactiveThinkingPlanner:
"""主动思考规划器
负责:
1. 搜集信息(聊天流印象、话题关键词、历史聊天记录)
2. 调用LLM决策什么都不做/简单冒泡/抛出话题
3. 根据决策生成回复内容
"""
def __init__(self):
"""初始化规划器"""
try:
self.decision_llm = LLMRequest(
model_set=model_config.model_task_config.utils,
request_type="proactive_thinking_decision"
)
self.reply_llm = LLMRequest(
model_set=model_config.model_task_config.replyer,
request_type="proactive_thinking_reply"
)
except Exception as e:
logger.error(f"初始化LLM失败: {e}")
self.decision_llm = None
self.reply_llm = None
async def gather_context(self, stream_id: str) -> Optional[dict[str, Any]]:
"""搜集聊天流的上下文信息
Args:
stream_id: 聊天流ID
Returns:
dict: 包含所有上下文信息的字典失败返回None
"""
try:
# 1. 获取聊天流印象数据
stream_data = await self._get_stream_impression(stream_id)
if not stream_data:
logger.warning(f"无法获取聊天流 {stream_id} 的印象数据")
return None
# 2. 获取最近的聊天记录
recent_messages = await message_api.get_recent_messages(
chat_id=stream_id,
limit=20,
limit_mode="latest",
hours=24
)
recent_chat_history = ""
if recent_messages:
recent_chat_history = await message_api.build_readable_messages_to_str(recent_messages)
# 3. 获取bot人设
individuality = Individuality()
bot_personality = await individuality.get_personality_block()
# 4. 构建上下文
context = {
"stream_id": stream_id,
"stream_name": stream_data.get("stream_name", "未知"),
"stream_impression": stream_data.get("stream_impression_text", "暂无印象"),
"chat_style": stream_data.get("stream_chat_style", "未知"),
"topic_keywords": stream_data.get("stream_topic_keywords", ""),
"interest_score": stream_data.get("stream_interest_score", 0.5),
"recent_chat_history": recent_chat_history or "暂无最近聊天记录",
"bot_personality": bot_personality,
"current_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
logger.debug(f"成功搜集聊天流 {stream_id} 的上下文信息")
return context
except Exception as e:
logger.error(f"搜集上下文信息失败: {e}", exc_info=True)
return None
async def _get_stream_impression(self, stream_id: str) -> Optional[dict[str, Any]]:
"""从数据库获取聊天流印象数据"""
try:
async with get_db_session() as session:
stmt = select(ChatStreams).where(ChatStreams.stream_id == stream_id)
result = await session.execute(stmt)
stream = result.scalar_one_or_none()
if not stream:
return None
return {
"stream_name": stream.group_name or "私聊",
"stream_impression_text": stream.stream_impression_text or "",
"stream_chat_style": stream.stream_chat_style or "",
"stream_topic_keywords": stream.stream_topic_keywords or "",
"stream_interest_score": float(stream.stream_interest_score) if stream.stream_interest_score else 0.5,
}
except Exception as e:
logger.error(f"获取聊天流印象失败: {e}")
return None
async def make_decision(
self, context: dict[str, Any]
) -> Optional[dict[str, Any]]:
"""使用LLM进行决策
Args:
context: 上下文信息
Returns:
dict: 决策结果,包含:
- action: "do_nothing" | "simple_bubble" | "throw_topic"
- reasoning: 决策理由
- topic: (可选) 如果是throw_topic包含话题内容
"""
if not self.decision_llm:
logger.error("决策LLM未初始化")
return None
response = None
try:
decision_prompt = self._build_decision_prompt(context)
if global_config.debug.show_prompt:
logger.info(f"决策提示词:\n{decision_prompt}")
response, _ = await self.decision_llm.generate_response_async(prompt=decision_prompt)
if not response:
logger.warning("LLM未返回有效响应")
return None
# 清理并解析JSON响应
cleaned_response = self._clean_json_response(response)
decision = json.loads(cleaned_response)
logger.info(
f"决策结果: {decision.get('action', 'unknown')} - {decision.get('reasoning', '无理由')}"
)
return decision
except json.JSONDecodeError as e:
logger.error(f"解析决策JSON失败: {e}")
if response:
logger.debug(f"原始响应: {response}")
return None
except Exception as e:
logger.error(f"决策过程失败: {e}", exc_info=True)
return None
def _build_decision_prompt(self, context: dict[str, Any]) -> str:
"""构建决策提示词"""
return f"""你是一个有着独特个性的AI助手。你的人设是
{context['bot_personality']}
现在是 {context['current_time']},你正在考虑是否要主动在 "{context['stream_name']}" 中说些什么。
【聊天环境信息】
- 整体印象: {context['stream_impression']}
- 聊天风格: {context['chat_style']}
- 常见话题: {context['topic_keywords'] or '暂无'}
- 你的兴趣程度: {context['interest_score']:.2f}/1.0
【最近的聊天记录】
{context['recent_chat_history']}
请根据以上信息,决定你现在应该做什么:
**选项1什么都不做 (do_nothing)**
- 适用场景:现在可能是休息时间、工作时间,或者气氛不适合说话
- 也可能是:最近聊天很活跃不需要你主动、没什么特别想说的、此时说话会显得突兀
**选项2简单冒个泡 (simple_bubble)**
- 适用场景:群里有点冷清,你想引起注意或活跃气氛
- 方式:简单问个好、发个表情、说句无关紧要的话,没有深意,就是刷个存在感
**选项3抛出一个话题 (throw_topic)**
- 适用场景:历史消息中有未讨论完的话题、你有自己的想法、或者想深入聊某个主题
- 方式:明确提出一个话题,希望得到回应和讨论
请以JSON格式回复你的决策
{{
"action": "do_nothing" | "simple_bubble" | "throw_topic",
"reasoning": "你的决策理由,说明为什么选择这个行动",
"topic": "(仅当action=throw_topic时填写)你想抛出的具体话题"
}}
注意:
1. 如果最近聊天很活跃不到1小时倾向于选择 do_nothing
2. 如果你对这个环境兴趣不高(<0.4),倾向于选择 do_nothing 或 simple_bubble
3. 只有在真的有话题想聊时才选择 throw_topic
4. 符合你的人设,不要太过热情或冷淡
"""
async def generate_reply(
self,
context: dict[str, Any],
action: Literal["simple_bubble", "throw_topic"],
topic: Optional[str] = None
) -> Optional[str]:
"""生成回复内容
Args:
context: 上下文信息
action: 动作类型
topic: (可选) 话题内容当action=throw_topic时必须提供
Returns:
str: 生成的回复文本失败返回None
"""
if not self.reply_llm:
logger.error("回复LLM未初始化")
return None
try:
reply_prompt = await self._build_reply_prompt(context, action, topic)
if global_config.debug.show_prompt:
logger.info(f"回复提示词:\n{reply_prompt}")
response, _ = await self.reply_llm.generate_response_async(prompt=reply_prompt)
if not response:
logger.warning("LLM未返回有效回复")
return None
logger.info(f"生成回复成功: {response[:50]}...")
return response.strip()
except Exception as e:
logger.error(f"生成回复失败: {e}", exc_info=True)
return None
async def _get_expression_habits(self, stream_id: str, chat_history: str) -> str:
"""获取表达方式参考
Args:
stream_id: 聊天流ID
chat_history: 聊天历史
Returns:
str: 格式化的表达方式参考文本
"""
try:
# 使用表达方式选择器获取合适的表达方式
selected_expressions = await expression_selector.select_suitable_expressions(
chat_id=stream_id,
chat_history=chat_history,
target_message=None, # 主动思考没有target message
max_num=6, # 主动思考时使用较少的表达方式
min_num=2
)
if not selected_expressions:
return ""
style_habits = []
grammar_habits = []
for expr in selected_expressions:
if isinstance(expr, dict) and "situation" in expr and "style" in expr:
expr_type = expr.get("type", "style")
if expr_type == "grammar":
grammar_habits.append(f"{expr['situation']}时,使用 {expr['style']}")
else:
style_habits.append(f"{expr['situation']}时,使用 {expr['style']}")
expression_block = ""
if style_habits or grammar_habits:
expression_block = "\n【表达方式参考】\n"
if style_habits:
expression_block += "语言习惯:\n" + "\n".join(style_habits) + "\n"
if grammar_habits:
expression_block += "句法特点:\n" + "\n".join(grammar_habits) + "\n"
expression_block += "注意:仅在情景合适时自然地使用这些表达,不要生硬套用。\n"
return expression_block
except Exception as e:
logger.warning(f"获取表达方式失败: {e}")
return ""
async def _build_reply_prompt(
self,
context: dict[str, Any],
action: Literal["simple_bubble", "throw_topic"],
topic: Optional[str]
) -> str:
"""构建回复提示词"""
# 获取表达方式参考
expression_habits = await self._get_expression_habits(
stream_id=context.get('stream_id', ''),
chat_history=context.get('recent_chat_history', '')
)
if action == "simple_bubble":
return f"""你是一个有着独特个性的AI助手。你的人设是
{context['bot_personality']}
现在是 {context['current_time']},你决定在 "{context['stream_name']}" 中简单冒个泡。
【聊天环境】
- 整体印象: {context['stream_impression']}
- 聊天风格: {context['chat_style']}
【最近的聊天记录】
{context['recent_chat_history']}
{expression_habits}
请生成一条简短的消息,用于活跃气氛或刷存在感。要求:
1. 非常简短5-15字
2. 轻松随意,不要有明确的话题或问题
3. 可以是:问候、表达心情、随口一句话
4. 符合你的人设和当前聊天风格
5. 如果有表达方式参考,在合适时自然使用
直接输出消息内容,不要解释:"""
else: # throw_topic
return f"""你是一个有着独特个性的AI助手。你的人设是
{context['bot_personality']}
现在是 {context['current_time']},你决定在 "{context['stream_name']}" 中抛出一个话题。
【聊天环境】
- 整体印象: {context['stream_impression']}
- 聊天风格: {context['chat_style']}
- 常见话题: {context['topic_keywords'] or '暂无'}
【最近的聊天记录】
{context['recent_chat_history']}
【你想抛出的话题】
{topic}
{expression_habits}
请根据这个话题生成一条消息,要求:
1. 明确提出话题,引导讨论
2. 长度适中20-50字
3. 自然地引入话题,不要生硬
4. 可以结合最近的聊天记录
5. 符合你的人设和当前聊天风格
6. 如果有表达方式参考,在合适时自然使用
直接输出消息内容,不要解释:"""
def _clean_json_response(self, response: str) -> str:
"""清理LLM响应中的JSON格式标记"""
import re
cleaned = response.strip()
cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned, flags=re.MULTILINE | re.IGNORECASE)
cleaned = re.sub(r"\s*```$", "", cleaned, flags=re.MULTILINE)
json_start = cleaned.find("{")
json_end = cleaned.rfind("}")
if json_start != -1 and json_end != -1 and json_end > json_start:
cleaned = cleaned[json_start:json_end + 1]
return cleaned.strip()
# 全局规划器实例
_planner = ProactiveThinkingPlanner()
# 统计数据
_statistics: dict[str, dict[str, Any]] = {}
def _update_statistics(stream_id: str, action: str):
"""更新统计数据
Args:
stream_id: 聊天流ID
action: 执行的动作
"""
if stream_id not in _statistics:
_statistics[stream_id] = {
"total_executions": 0,
"do_nothing_count": 0,
"simple_bubble_count": 0,
"throw_topic_count": 0,
"last_execution_time": None,
}
_statistics[stream_id]["total_executions"] += 1
_statistics[stream_id][f"{action}_count"] += 1
_statistics[stream_id]["last_execution_time"] = datetime.now().isoformat()
def get_statistics(stream_id: Optional[str] = None) -> dict[str, Any]:
"""获取统计数据
Args:
stream_id: 聊天流IDNone表示获取所有统计
Returns:
统计数据字典
"""
if stream_id:
return _statistics.get(stream_id, {})
return _statistics
async def execute_proactive_thinking(stream_id: str):
"""执行主动思考(被调度器调用的回调函数)
Args:
stream_id: 聊天流ID
"""
from src.config.config import global_config
from src.plugins.built_in.affinity_flow_chatter.proactive_thinking_scheduler import (
proactive_thinking_scheduler,
)
config = global_config.proactive_thinking
logger.info(f"开始为聊天流 {stream_id} 执行主动思考")
try:
# 0. 前置检查
# 检查是否在安静时段
if proactive_thinking_scheduler._is_in_quiet_hours():
logger.debug(f"当前在安静时段,跳过主动思考")
return
# 检查每日限制
if not proactive_thinking_scheduler._check_daily_limit(stream_id):
logger.info(f"聊天流 {stream_id} 今日主动发言次数已达上限")
return
# 1. 搜集信息
context = await _planner.gather_context(stream_id)
if not context:
logger.warning(f"无法搜集聊天流 {stream_id} 的上下文,跳过本次主动思考")
return
# 检查兴趣分数阈值
interest_score = context.get('interest_score', 0.5)
if not proactive_thinking_scheduler._check_interest_score_threshold(interest_score):
logger.info(f"聊天流 {stream_id} 兴趣分数不在阈值范围内")
return
# 2. 进行决策
decision = await _planner.make_decision(context)
if not decision:
logger.warning(f"决策失败,跳过本次主动思考")
return
action = decision.get("action", "do_nothing")
reasoning = decision.get("reasoning", "")
# 记录决策日志
if config.log_decisions:
logger.info(f"[决策详情] stream_id={stream_id}, action={action}, reasoning={reasoning}")
# 3. 根据决策执行相应动作
if action == "do_nothing":
logger.info(f"决策:什么都不做。理由:{reasoning}")
return
elif action == "simple_bubble":
logger.info(f"决策:简单冒个泡。理由:{reasoning}")
# 生成简单的消息
reply = await _planner.generate_reply(context, "simple_bubble")
if reply:
await send_api.text_to_stream(stream_id=stream_id, text=reply)
logger.info(f"已发送冒泡消息到 {stream_id}")
# 增加每日计数
proactive_thinking_scheduler._increment_daily_count(stream_id)
# 更新统计
if config.enable_statistics:
_update_statistics(stream_id, action)
# 冒泡后可以继续主动思考,不需要暂停
elif action == "throw_topic":
topic = decision.get("topic", "")
logger.info(f"决策:抛出话题。理由:{reasoning},话题:{topic}")
if not topic:
logger.warning("选择了抛出话题但未提供话题内容,降级为冒泡")
reply = await _planner.generate_reply(context, "simple_bubble")
else:
# 生成基于话题的消息
reply = await _planner.generate_reply(context, "throw_topic", topic)
if reply:
await send_api.text_to_stream(stream_id=stream_id, text=reply)
logger.info(f"已发送话题消息到 {stream_id}")
# 增加每日计数
proactive_thinking_scheduler._increment_daily_count(stream_id)
# 更新统计
if config.enable_statistics:
_update_statistics(stream_id, action)
# 抛出话题后暂停主动思考(如果配置了冷却时间)
if config.topic_throw_cooldown > 0:
await proactive_thinking_scheduler.pause_proactive_thinking(stream_id, reason="已抛出话题")
# 设置定时恢复在reply_reset_enabled关闭时使用
if not config.reply_reset_enabled:
import asyncio
async def resume_after_cooldown():
await asyncio.sleep(config.topic_throw_cooldown)
await proactive_thinking_scheduler.schedule_proactive_thinking(stream_id)
asyncio.create_task(resume_after_cooldown())
except Exception as e:
logger.error(f"执行主动思考失败: {e}", exc_info=True)

View File

@@ -0,0 +1,498 @@
"""
主动思考调度器
基于统一调度器(unified_scheduler),为每个聊天流管理主动思考任务
根据聊天流的兴趣分数(stream_interest_score)动态计算触发间隔
"""
import asyncio
from datetime import datetime, timedelta
from typing import Any, Optional
from src.common.database.sqlalchemy_database_api import get_db_session
from src.common.database.sqlalchemy_models import ChatStreams
from src.common.logger import get_logger
from src.schedule.unified_scheduler import TriggerType, unified_scheduler
from sqlalchemy import select
logger = get_logger("proactive_thinking_scheduler")
class ProactiveThinkingScheduler:
"""主动思考调度器
负责为每个聊天流创建和管理主动思考任务。
特点:
1. 根据聊天流的兴趣分数动态计算触发间隔
2. 在bot回复后自动重置定时任务
3. 抛出话题后暂停等待下次reply后才恢复
"""
def __init__(self):
"""初始化调度器"""
self._stream_schedules: dict[str, str] = {} # stream_id -> schedule_id
self._paused_streams: set[str] = set() # 因抛出话题而暂停的聊天流
self._lock = asyncio.Lock()
# 统计数据
self._statistics: dict[str, dict[str, Any]] = {} # stream_id -> 统计信息
self._daily_counts: dict[str, dict[str, int]] = {} # stream_id -> {date: count}
# 从全局配置加载(延迟导入避免循环依赖)
from src.config.config import global_config
self.config = global_config.proactive_thinking
def _calculate_interval(self, interest_score: float) -> int:
"""根据兴趣分数计算触发间隔
Args:
interest_score: 聊天流兴趣分数 (0.0-1.0)
Returns:
int: 触发间隔(秒)
公式:
- 兴趣分数越高,间隔越短(更频繁思考)
- interval = base_interval * (factor - interest_score)
- 例如interest_score=0.5 -> interval=1800*1.5=2700秒(45分钟)
- interest_score=0.8 -> interval=1800*1.2=2160秒(36分钟)
- interest_score=0.2 -> interval=1800*1.8=3240秒(54分钟)
"""
# 如果不使用兴趣分数,直接返回基础间隔
if not self.config.use_interest_score:
return self.config.base_interval
# 确保分数在有效范围内
interest_score = max(0.0, min(1.0, interest_score))
# 计算间隔:分数越高,系数越小,间隔越短
factor = self.config.interest_score_factor - interest_score
interval = int(self.config.base_interval * factor)
# 限制在最小和最大间隔之间
interval = max(self.config.min_interval, min(self.config.max_interval, interval))
logger.debug(f"兴趣分数 {interest_score:.2f} -> 触发间隔 {interval}秒 ({interval/60:.1f}分钟)")
return interval
def _check_whitelist_blacklist(self, stream_config: str) -> bool:
"""检查聊天流是否通过黑白名单验证
Args:
stream_config: 聊天流配置字符串,格式: "platform:id:type"
Returns:
bool: True表示允许主动思考False表示拒绝
"""
# 解析类型
parts = stream_config.split(":")
if len(parts) != 3:
logger.warning(f"无效的stream_config格式: {stream_config}")
return False
is_private = parts[2] == "private"
# 检查基础开关
if is_private and not self.config.enable_in_private:
return False
if not is_private and not self.config.enable_in_group:
return False
# 黑名单检查(优先级高)
if self.config.blacklist_mode:
blacklist = self.config.blacklist_private if is_private else self.config.blacklist_group
if stream_config in blacklist:
logger.debug(f"聊天流 {stream_config} 在黑名单中,拒绝主动思考")
return False
# 白名单检查
if self.config.whitelist_mode:
whitelist = self.config.whitelist_private if is_private else self.config.whitelist_group
if stream_config not in whitelist:
logger.debug(f"聊天流 {stream_config} 不在白名单中,拒绝主动思考")
return False
return True
def _check_interest_score_threshold(self, interest_score: float) -> bool:
"""检查兴趣分数是否在阈值范围内
Args:
interest_score: 兴趣分数
Returns:
bool: True表示在范围内
"""
if interest_score < self.config.min_interest_score:
logger.debug(f"兴趣分数 {interest_score:.2f} 低于最低阈值 {self.config.min_interest_score}")
return False
if interest_score > self.config.max_interest_score:
logger.debug(f"兴趣分数 {interest_score:.2f} 高于最高阈值 {self.config.max_interest_score}")
return False
return True
def _check_daily_limit(self, stream_id: str) -> bool:
"""检查今日主动发言次数是否超限
Args:
stream_id: 聊天流ID
Returns:
bool: True表示未超限
"""
if self.config.max_daily_proactive == 0:
return True # 不限制
today = datetime.now().strftime("%Y-%m-%d")
if stream_id not in self._daily_counts:
self._daily_counts[stream_id] = {}
# 清理过期日期的数据
for date in list(self._daily_counts[stream_id].keys()):
if date != today:
del self._daily_counts[stream_id][date]
count = self._daily_counts[stream_id].get(today, 0)
if count >= self.config.max_daily_proactive:
logger.debug(f"聊天流 {stream_id} 今日主动发言次数已达上限 ({count}/{self.config.max_daily_proactive})")
return False
return True
def _increment_daily_count(self, stream_id: str):
"""增加今日主动发言计数"""
today = datetime.now().strftime("%Y-%m-%d")
if stream_id not in self._daily_counts:
self._daily_counts[stream_id] = {}
self._daily_counts[stream_id][today] = self._daily_counts[stream_id].get(today, 0) + 1
def _is_in_quiet_hours(self) -> bool:
"""检查当前是否在安静时段
Returns:
bool: True表示在安静时段
"""
if not self.config.enable_time_strategy:
return False
now = datetime.now()
current_time = now.strftime("%H:%M")
start = self.config.quiet_hours_start
end = self.config.quiet_hours_end
# 处理跨日的情况如23:00-07:00
if start <= end:
return start <= current_time <= end
else:
return current_time >= start or current_time <= end
async def _get_stream_interest_score(self, stream_id: str) -> float:
"""从数据库获取聊天流的兴趣分数
Args:
stream_id: 聊天流ID
Returns:
float: 兴趣分数默认0.5
"""
try:
async with get_db_session() as session:
stmt = select(ChatStreams).where(ChatStreams.stream_id == stream_id)
result = await session.execute(stmt)
stream = result.scalar_one_or_none()
if stream and stream.stream_interest_score is not None:
return float(stream.stream_interest_score)
else:
return 0.5 # 默认中等兴趣
except Exception as e:
logger.error(f"获取聊天流 {stream_id} 兴趣分数失败: {e}")
return 0.5
async def schedule_proactive_thinking(self, stream_id: str) -> bool:
"""为聊天流创建或重置主动思考任务
Args:
stream_id: 聊天流ID
Returns:
bool: 是否成功创建/重置任务
"""
try:
async with self._lock:
# 如果该流因抛出话题而暂停,先清除暂停标记
if stream_id in self._paused_streams:
logger.info(f"清除聊天流 {stream_id} 的暂停标记")
self._paused_streams.discard(stream_id)
# 如果已经有任务,先移除
if stream_id in self._stream_schedules:
old_schedule_id = self._stream_schedules[stream_id]
await unified_scheduler.remove_schedule(old_schedule_id)
logger.debug(f"移除聊天流 {stream_id} 的旧任务")
# 获取兴趣分数并计算间隔
interest_score = await self._get_stream_interest_score(stream_id)
interval_seconds = self._calculate_interval(interest_score)
# 导入回调函数(延迟导入避免循环依赖)
from src.plugins.built_in.affinity_flow_chatter.proactive_thinking_executor import (
execute_proactive_thinking,
)
# 创建新任务
schedule_id = await unified_scheduler.create_schedule(
callback=execute_proactive_thinking,
trigger_type=TriggerType.TIME,
trigger_config={
"delay_seconds": interval_seconds,
},
is_recurring=True,
task_name=f"ProactiveThinking-{stream_id}",
callback_args=(stream_id,),
)
self._stream_schedules[stream_id] = schedule_id
# 计算下次触发时间
next_run_time = datetime.now() + timedelta(seconds=interval_seconds)
logger.info(
f"为聊天流 {stream_id} 创建主动思考任务\n"
f" - 兴趣分数: {interest_score:.2f}\n"
f" - 触发间隔: {interval_seconds}秒 ({interval_seconds/60:.1f}分钟)\n"
f" - 下次触发: {next_run_time.strftime('%Y-%m-%d %H:%M:%S')}"
)
return True
except Exception as e:
logger.error(f"为聊天流 {stream_id} 创建主动思考任务失败: {e}", exc_info=True)
return False
async def pause_proactive_thinking(self, stream_id: str, reason: str = "抛出话题") -> bool:
"""暂停聊天流的主动思考任务
当选择"抛出话题"后,应该暂停该聊天流的主动思考,
直到bot至少执行过一次reply后才恢复。
Args:
stream_id: 聊天流ID
reason: 暂停原因
Returns:
bool: 是否成功暂停
"""
try:
async with self._lock:
if stream_id not in self._stream_schedules:
logger.warning(f"尝试暂停不存在的任务: {stream_id}")
return False
schedule_id = self._stream_schedules[stream_id]
success = await unified_scheduler.pause_schedule(schedule_id)
if success:
self._paused_streams.add(stream_id)
logger.info(f"暂停聊天流 {stream_id} 的主动思考任务,原因: {reason}")
return success
except Exception as e:
logger.error(f"暂停聊天流 {stream_id} 的主动思考任务失败: {e}", exc_info=True)
return False
async def resume_proactive_thinking(self, stream_id: str) -> bool:
"""恢复聊天流的主动思考任务
Args:
stream_id: 聊天流ID
Returns:
bool: 是否成功恢复
"""
try:
async with self._lock:
if stream_id not in self._stream_schedules:
logger.warning(f"尝试恢复不存在的任务: {stream_id}")
return False
schedule_id = self._stream_schedules[stream_id]
success = await unified_scheduler.resume_schedule(schedule_id)
if success:
self._paused_streams.discard(stream_id)
logger.info(f"恢复聊天流 {stream_id} 的主动思考任务")
return success
except Exception as e:
logger.error(f"恢复聊天流 {stream_id} 的主动思考任务失败: {e}", exc_info=True)
return False
async def cancel_proactive_thinking(self, stream_id: str) -> bool:
"""取消聊天流的主动思考任务
Args:
stream_id: 聊天流ID
Returns:
bool: 是否成功取消
"""
try:
async with self._lock:
if stream_id not in self._stream_schedules:
return True # 已经不存在,视为成功
schedule_id = self._stream_schedules.pop(stream_id)
self._paused_streams.discard(stream_id)
success = await unified_scheduler.remove_schedule(schedule_id)
logger.info(f"取消聊天流 {stream_id} 的主动思考任务")
return success
except Exception as e:
logger.error(f"取消聊天流 {stream_id} 的主动思考任务失败: {e}", exc_info=True)
return False
async def is_paused(self, stream_id: str) -> bool:
"""检查聊天流的主动思考是否被暂停
Args:
stream_id: 聊天流ID
Returns:
bool: 是否暂停中
"""
async with self._lock:
return stream_id in self._paused_streams
async def get_task_info(self, stream_id: str) -> Optional[dict[str, Any]]:
"""获取聊天流的主动思考任务信息
Args:
stream_id: 聊天流ID
Returns:
dict: 任务信息如果不存在返回None
"""
async with self._lock:
if stream_id not in self._stream_schedules:
return None
schedule_id = self._stream_schedules[stream_id]
task_info = await unified_scheduler.get_task_info(schedule_id)
if task_info:
task_info["is_paused_for_topic"] = stream_id in self._paused_streams
return task_info
async def list_all_tasks(self) -> list[dict[str, Any]]:
"""列出所有主动思考任务
Returns:
list: 任务信息列表
"""
async with self._lock:
tasks = []
for stream_id, schedule_id in self._stream_schedules.items():
task_info = await unified_scheduler.get_task_info(schedule_id)
if task_info:
task_info["stream_id"] = stream_id
task_info["is_paused_for_topic"] = stream_id in self._paused_streams
tasks.append(task_info)
return tasks
def get_statistics(self) -> dict[str, Any]:
"""获取调度器统计信息
Returns:
dict: 统计信息
"""
return {
"total_scheduled_streams": len(self._stream_schedules),
"paused_for_topic": len(self._paused_streams),
"active_tasks": len(self._stream_schedules) - len(self._paused_streams),
}
async def log_next_trigger_times(self, max_streams: int = 10):
"""在日志中输出聊天流的下次触发时间
Args:
max_streams: 最多显示多少个聊天流0表示全部
"""
logger.info("=" * 60)
logger.info("主动思考任务状态")
logger.info("=" * 60)
tasks = await self.list_all_tasks()
if not tasks:
logger.info("当前没有活跃的主动思考任务")
logger.info("=" * 60)
return
# 按下次触发时间排序
tasks_sorted = sorted(
tasks,
key=lambda x: x.get("next_run_time", datetime.max) or datetime.max
)
# 限制显示数量
if max_streams > 0:
tasks_sorted = tasks_sorted[:max_streams]
logger.info(f"共有 {len(self._stream_schedules)} 个任务,显示前 {len(tasks_sorted)}")
logger.info("")
for i, task in enumerate(tasks_sorted, 1):
stream_id = task.get("stream_id", "Unknown")
next_run = task.get("next_run_time")
is_paused = task.get("is_paused_for_topic", False)
# 获取聊天流名称(如果可能)
stream_name = stream_id[:16] + "..." if len(stream_id) > 16 else stream_id
if next_run:
# 计算剩余时间
now = datetime.now()
remaining = next_run - now
remaining_seconds = int(remaining.total_seconds())
if remaining_seconds < 0:
time_str = "已过期(待执行)"
elif remaining_seconds < 60:
time_str = f"{remaining_seconds}秒后"
elif remaining_seconds < 3600:
time_str = f"{remaining_seconds // 60}分钟后"
else:
hours = remaining_seconds // 3600
minutes = (remaining_seconds % 3600) // 60
time_str = f"{hours}小时{minutes}分钟后"
status = "⏸️ 暂停中" if is_paused else "✅ 活跃"
logger.info(
f"[{i:2d}] {status} | {stream_name}\n"
f" 下次触发: {next_run.strftime('%Y-%m-%d %H:%M:%S')} ({time_str})"
)
else:
logger.info(
f"[{i:2d}] ⚠️ 未知 | {stream_name}\n"
f" 下次触发: 未设置"
)
logger.info("")
logger.info("=" * 60)
# 全局调度器实例
proactive_thinking_scheduler = ProactiveThinkingScheduler()

View File

@@ -1,14 +0,0 @@
from src.plugin_system.base.plugin_metadata import PluginMetadata
__plugin_meta__ = PluginMetadata(
name="MoFox-Bot主动思考",
description="主动思考插件",
usage="该插件由系统自动触发。",
version="1.0.0",
author="MoFox-Studio",
license="GPL-v3.0-or-later",
repository_url="https://github.com/MoFox-Studio",
keywords=["主动思考", "自己发消息"],
categories=["Chat", "Integration"],
extra={"is_built_in": True, "plugin_type": "functional"},
)

View File

@@ -1,39 +0,0 @@
from src.common.logger import get_logger
from src.plugin_system import (
BaseEventHandler,
BasePlugin,
ConfigField,
EventHandlerInfo,
register_plugin,
)
from src.plugin_system.base.base_plugin import BasePlugin
from .proacive_thinker_event import ProactiveThinkerEventHandler
logger = get_logger(__name__)
@register_plugin
class ProactiveThinkerPlugin(BasePlugin):
"""一个主动思考的插件"""
plugin_name: str = "proactive_thinker"
enable_plugin: bool = True
dependencies: list[str] = []
python_dependencies: list[str] = []
config_file_name: str = "config.toml"
config_schema: dict = {
"plugin": {
"config_version": ConfigField(type=str, default="1.1.0", description="配置文件版本"),
},
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def get_plugin_components(self) -> list[tuple[EventHandlerInfo, type[BaseEventHandler]]]:
"""返回插件的EventHandler组件"""
components: list[tuple[EventHandlerInfo, type[BaseEventHandler]]] = [
(ProactiveThinkerEventHandler.get_handler_info(), ProactiveThinkerEventHandler)
]
return components

View File

@@ -1,246 +0,0 @@
import asyncio
import random
import time
import traceback
from datetime import datetime
from maim_message import UserInfo
from src.chat.message_receive.chat_stream import get_chat_manager
from src.common.logger import get_logger
from src.config.config import global_config
from src.manager.async_task_manager import AsyncTask, async_task_manager
from src.plugin_system import BaseEventHandler, EventType
from src.plugin_system.apis import chat_api, message_api, person_api
from src.plugin_system.base.base_event import HandlerResult
from .proactive_thinker_executor import ProactiveThinkerExecutor
logger = get_logger(__name__)
class ColdStartTask(AsyncTask):
"""
“冷启动”任务,在机器人启动时执行一次。
它的核心职责是“唤醒”那些因重启而“沉睡”的聊天流,确保它们能够接收主动思考。
对于在白名单中但从未有过记录的全新用户,它也会发起第一次“破冰”问候。
"""
def __init__(self, bot_start_time: float):
super().__init__(task_name="ColdStartTask")
self.chat_manager = get_chat_manager()
self.executor = ProactiveThinkerExecutor()
self.bot_start_time = bot_start_time
async def run(self):
"""任务主逻辑,在启动后执行一次白名单扫描。"""
logger.info("冷启动任务已启动,将在短暂延迟后开始唤醒沉睡的聊天流...")
await asyncio.sleep(30) # 延迟以确保所有服务和聊天流已从数据库加载完毕
try:
logger.info("【冷启动】开始扫描白名单,唤醒沉睡的聊天流...")
# 【修复】增加对私聊总开关的判断
if not global_config.proactive_thinking.enable_in_private:
logger.info("【冷启动】私聊主动思考功能未启用,任务结束。")
return
enabled_private_chats = global_config.proactive_thinking.enabled_private_chats
if not enabled_private_chats:
logger.debug("【冷启动】私聊白名单为空,任务结束。")
return
for chat_id in enabled_private_chats:
try:
platform, user_id_str = chat_id.split(":")
user_id = int(user_id_str)
should_wake_up = False
stream = chat_api.get_stream_by_user_id(user_id_str, platform)
if not stream:
should_wake_up = True
logger.info(f"【冷启动】发现全新用户 {chat_id},准备发起第一次问候。")
elif stream.last_active_time < self.bot_start_time:
should_wake_up = True
logger.info(
f"【冷启动】发现沉睡的聊天流 {chat_id} (最后活跃于 {datetime.fromtimestamp(stream.last_active_time)}),准备唤醒。"
)
if should_wake_up:
person_id = person_api.get_person_id(platform, user_id)
nickname = await person_api.get_person_value(person_id, "nickname")
user_nickname = nickname or f"用户{user_id}"
user_info = UserInfo(platform=platform, user_id=str(user_id), user_nickname=user_nickname)
# 使用 get_or_create_stream 来安全地获取或创建流
stream = await self.chat_manager.get_or_create_stream(platform, user_info)
formatted_stream_id = f"{stream.user_info.platform}:{stream.user_info.user_id}:private"
await self.executor.execute(stream_id=formatted_stream_id, start_mode="cold_start")
logger.info(f"【冷启动】已为用户 {chat_id} (昵称: {user_nickname}) 发送唤醒/问候消息。")
except ValueError:
logger.warning(f"【冷启动】白名单条目格式错误或用户ID无效已跳过: {chat_id}")
except Exception as e:
logger.error(f"【冷启动】处理用户 {chat_id} 时发生未知错误: {e}", exc_info=True)
except asyncio.CancelledError:
logger.info("冷启动任务被正常取消。")
except Exception as e:
logger.error(f"【冷启动】任务出现严重错误: {e}", exc_info=True)
finally:
logger.info("【冷启动】任务执行完毕。")
class ProactiveThinkingTask(AsyncTask):
"""
主动思考的后台任务(日常唤醒),负责在聊天“冷却”后重新活跃气氛。
它只处理已经存在的聊天流。
"""
def __init__(self):
super().__init__(task_name="ProactiveThinkingTask")
self.chat_manager = get_chat_manager()
self.executor = ProactiveThinkerExecutor()
def _get_next_interval(self) -> float:
"""
动态计算下一次执行的时间间隔,模拟人类行为的随机性。
结合了基础间隔、随机偏移和每日不同时段的活跃度调整。
"""
# 从配置中读取基础间隔和随机范围
base_interval = global_config.proactive_thinking.interval
sigma = global_config.proactive_thinking.interval_sigma
# 1. 在 [base - sigma, base + sigma] 范围内随机取一个值
interval = random.uniform(base_interval - sigma, base_interval + sigma)
# 2. 根据当前时间,应用活跃度调整因子
now = datetime.now()
current_time_str = now.strftime("%H:%M")
adjust_rules = global_config.proactive_thinking.talk_frequency_adjust
if adjust_rules and adjust_rules[0]:
# 按时间对规则排序,确保能找到正确的时间段
rules = sorted([rule.split(",") for rule in adjust_rules[0][1:]], key=lambda x: x[0])
factor = 1.0
# 找到最后一个小于等于当前时间的规则
for time_str, factor_str in rules:
if current_time_str >= time_str:
factor = float(factor_str)
else:
break # 后面的时间都比当前晚,无需再找
# factor > 1 表示更活跃,所以用除法来缩短间隔
interval /= factor
# 保证最小间隔,防止过于频繁的骚扰
return max(60.0, interval)
async def run(self):
"""任务主循环,周期性地检查所有已存在的聊天是否需要“唤醒”。"""
logger.info("日常唤醒任务已启动,将根据动态间隔检查聊天活跃度。")
await asyncio.sleep(15) # 初始等待
while True:
# 计算下一次检查前的休眠时间
next_interval = self._get_next_interval()
try:
logger.debug(f"【日常唤醒】下一次检查将在 {next_interval:.2f} 秒后进行。")
await asyncio.sleep(next_interval)
logger.info("【日常唤醒】开始检查不活跃的聊天...")
# 加载白名单配置
enabled_private = set(global_config.proactive_thinking.enabled_private_chats)
enabled_groups = set(global_config.proactive_thinking.enabled_group_chats)
# 分别处理私聊和群聊
# 1. 处理私聊:首先检查私聊总开关
if global_config.proactive_thinking.enable_in_private:
for chat_id in enabled_private:
try:
platform, user_id_str = chat_id.split(":")
# 【核心逻辑】检查聊天流是否存在。不存在则跳过交由ColdStartTask处理。
stream = chat_api.get_stream_by_user_id(user_id_str, platform)
if not stream:
continue
# 检查冷却时间
recent_messages = await message_api.get_recent_messages(
chat_id=stream.stream_id, limit=1, limit_mode="latest"
)
last_message_time = recent_messages[0]["time"] if recent_messages else stream.create_time
time_since_last_active = time.time() - last_message_time
if time_since_last_active > next_interval:
logger.info(
f"【日常唤醒-私聊】聊天流 {stream.stream_id} 已冷却 {time_since_last_active:.2f} 秒,触发主动对话。"
)
formatted_stream_id = f"{stream.user_info.platform}:{stream.user_info.user_id}:private"
await self.executor.execute(stream_id=formatted_stream_id, start_mode="wake_up")
stream.update_active_time()
await self.chat_manager._save_stream(stream)
except ValueError:
logger.warning(f"【日常唤醒】私聊白名单条目格式错误,已跳过: {chat_id}")
except Exception as e:
logger.error(f"【日常唤醒】处理私聊用户 {chat_id} 时发生未知错误: {e}", exc_info=True)
# 2. 处理群聊:首先检查群聊总开关
if global_config.proactive_thinking.enable_in_group:
all_streams = list(self.chat_manager.streams.values())
for stream in all_streams:
if not stream.group_info:
continue # 只处理群聊
# 【修复】检查群聊是否在白名单内
if f"qq:{stream.group_info.group_id}" in enabled_groups:
# 检查冷却时间
recent_messages = await message_api.get_recent_messages(chat_id=stream.stream_id, limit=1)
last_message_time = recent_messages[0]["time"] if recent_messages else stream.create_time
time_since_last_active = time.time() - last_message_time
if time_since_last_active > next_interval:
logger.info(
f"【日常唤醒-群聊】聊天流 {stream.stream_id} 已冷却 {time_since_last_active:.2f} 秒,触发主动对话。"
)
formatted_stream_id = f"{stream.user_info.platform}:{stream.group_info.group_id}:group"
await self.executor.execute(stream_id=formatted_stream_id, start_mode="wake_up")
stream.update_active_time()
await self.chat_manager._save_stream(stream)
except asyncio.CancelledError:
logger.info("日常唤醒任务被正常取消。")
break
except Exception as e:
traceback.print_exc() # 打印完整的堆栈跟踪
logger.error(f"【日常唤醒】任务出现错误将在60秒后重试: {e}", exc_info=True)
await asyncio.sleep(60)
class ProactiveThinkerEventHandler(BaseEventHandler):
"""主动思考插件的启动事件处理器,负责根据配置启动一个或两个后台任务。"""
handler_name: str = "proactive_thinker_on_start"
handler_description: str = "主动思考插件的启动事件处理器"
init_subscribe: list[EventType | str] = [EventType.ON_START]
async def execute(self, kwargs: dict | None) -> "HandlerResult":
"""在机器人启动时执行,根据配置决定是否启动后台任务。"""
logger.info("检测到插件启动事件,正在初始化【主动思考】")
# 检查总开关
if global_config.proactive_thinking.enable:
bot_start_time = time.time() # 记录“诞生时刻”
# 启动负责“日常唤醒”的核心任务
proactive_task = ProactiveThinkingTask()
await async_task_manager.add_task(proactive_task)
# 检查“冷启动”功能的独立开关
if global_config.proactive_thinking.enable_cold_start:
cold_start_task = ColdStartTask(bot_start_time)
await async_task_manager.add_task(cold_start_task)
else:
logger.info("【主动思考】功能未启用,所有任务均跳过启动。")
return HandlerResult(success=True, continue_process=True, message=None)

View File

@@ -1,199 +0,0 @@
# 主动聊天功能重构与设计方案
本文档旨在规划一个全新的、真正的“主动发起对话”功能。方案的核心是创建一个独立的、可配置的插件,并重构现有配置,使其更具模块化和可扩展性。
## 1. 配置文件重构 (`bot_config.toml`)
为了提高清晰度和模块化,我们将创建一个新的配置节 `[proactive_thinking]`
### 1.1. 移除旧配置
以下配置项将从 `[chat]` 配置节中 **移除**:
```toml
# mmc/config/bot_config.toml
# 从 line 132 开始移除以下所有行
talk_frequency_adjust = [['', '8:00,1', '12:00,1.2', '18:00,1.5', '01:00,0.6'], ['qq:114514:group', '12:20,1', '16:10,2', '20:10,1', '00:10,0.3'], ['qq:1919810:private', '8:20,1', '12:10,2', '20:10,1.5', '00:10,0.2']]
# ... (所有 talk_frequency_adjust 的注释) ...
# 主动思考功能配置仅在focus模式下生效
enable_proactive_thinking = false
proactive_thinking_interval = 1500
# ... (所有 proactive_thinking 的注释和相关配置) ...
delta_sigma = 120
# ... (所有 delta_sigma 的注释和相关配置) ...
```
### 1.2. 新增 `[proactive_thinking]` 配置节
`bot_config.toml` 文件 **末尾**,添加以下全新配置节:
```toml
# mmc/config/bot_config.toml
[proactive_thinking] # 主动思考(主动发起对话)功能配置
# --- 总开关 ---
enable = false # 是否启用主动发起对话功能
# --- 触发时机 ---
# 基础触发间隔AI会围绕这个时间点主动发起对话
interval = 1500 # 默认25分钟
# 间隔随机化标准差让触发时间更自然。设为0则为固定间隔。
interval_sigma = 120
# 每日活跃度调整,格式:[["", "HH:MM,factor", ...], ["stream_id", ...]]
# factor > 1.0 会缩短思考间隔更活跃factor < 1.0 会延长间隔。
talk_frequency_adjust = [['', '8:00,1', '12:00,1.2', '18:00,1.5', '01:00,0.6']]
# --- 作用范围 ---
enable_in_private = true # 是否允许在私聊中主动发起对话
enable_in_group = true # 是否允许在群聊中主动发起对话
# 私聊白名单,为空则对所有私聊生效
# 格式: ["platform:user_id", ...] e.g., ["qq:123456"]
enabled_private_chats = []
# 群聊白名单,为空则对所有群聊生效
# 格式: ["platform:group_id", ...] e.g., ["qq:7891011"]
enabled_group_chats = []
# --- 冷启动配置 (针对私聊) ---
# 对于白名单中不活跃的私聊,是否允许进行一次“冷启动”问候
enable_cold_start = true
# 冷启动后,该私聊的下一次主动思考需要等待的最小时间(秒)
cold_start_cooldown = 86400 # 默认24小时
```
## 2. 新插件架构设计 (`proactive_initiation_chatter`)
我们将创建一个全新的插件来实现此功能。
### 2.1. 文件结构
```
mmc/src/plugins/built_in/proactive_initiation_chatter/
├── __init__.py
├── _manifest.json
├── plugin.py # 插件主入口,负责启动和管理触发器
├── trigger_manager.py # 核心触发器,内置于插件中
├── initiation_chatter.py # Chatter实现监听触发事件
└── initiation_planner.py # 规划器,负责决定“说什么”
```
### 2.2. 核心组件设计
#### `plugin.py` - `ProactiveInitiationPlugin`
- **职责**: 作为插件的入口,它将在插件被加载时,读取 `[proactive_thinking]` 配置,并根据配置启动 `ProactiveTriggerManager`
- **启动逻辑 (参考 `maizone_refactored`)**:
#### `trigger_manager.py` - `ProactiveTriggerManager`
- **职责**: 这是一个后台服务类,负责管理所有聊天流的触发计时器,并实现包括“冷启动”在内的所有复杂触发逻辑。
- **核心逻辑 (参考 `SchedulerService`)**:
- 维护一个异步主循环,定期检查所有符合条件的聊天流。
- 根据配置的间隔和活跃度调整,计算下次触发时间。
- 在触发时,调用 `InitiationPlanner` 来决定具体内容,并通过事件管理器派发 `ProactiveInitiationEvent``ColdStartInitiationEvent`
---
## 3. 核心交互与依赖
新的 `proactive_initiation_chatter` 插件将与以下核心系统模块进行交互,以确保其决策的智能性和合规性:
- **`Config`**: `TriggerManager``Planner` 将从全局配置中读取 `[proactive_thinking]` 配置节,以获取所有行为参数。
- **`EventManager`**: `TriggerManager` 将通过事件管理器派发 `ProactiveInitiationEvent``ColdStartInitiationEvent` 事件。`InitiationChatter` 则会监听这些事件以触发执行。
- **`AsyncTaskManager`**: `ProactiveInitiationPlugin` 将使用此管理器来安全地在后台运行 `TriggerManager` 的主循环。
- **`ChatManager` (from `chat_stream.py`)**: 这是实现“冷启动”的核心。`TriggerManager` 将调用 `chat_manager.get_or_create_stream()` 方法来按需“唤醒”或创建不活跃的聊天流实例及其附带的空上下文。
- **`SleepManager`**: 在每次触发决策前,`TriggerManager` **必须**查询 `SleepManager` 以确认AI当前未处于睡眠状态。
- **`ScheduleManager` / `MonthlyPlanManager`**: `InitiationPlanner` 的“待办任务驱动”策略会查询这些管理器,以获取可作为聊天话题的日程或计划。
- **`MemoryManager` / `ContextManager`**: `InitiationPlanner` 的“记忆驱动”策略会查询长期记忆和短期上下文,以寻找关联性话题。
- **`RelationshipManager`**: `InitiationPlanner` 可以查询关系分数,作为执行某些话题策略的门槛。
## 4. 插件清单文件 (`_manifest.json`)
插件的清单文件将定义其元数据和依赖。
```json
{
"manifest_version": 1,
"name": "ProactiveInitiationChatter",
"version": "1.0.0",
"author": "Kilo Code",
"description": "一个真正的主动发起对话插件,由内置的、可高度配置的触发器驱动。",
"dependencies": [],
"python_dependencies": []
}
```
---
## 5. 上下文获取与“唤醒”机制详解
本设计区分了“热启动”(针对活跃聊天)和“冷启动”(针对非活跃聊天)两种场景,并利用 `ChatManager` 的不同方法来优雅地处理。
### 热启动流程 (Hot Start - 针对活跃聊天)
这是最常见的场景。当一个聊天流近期有过对话,其实例存在于 `ChatManager` 的内存缓存中。
1. **获取现有上下文**: `ProactiveTriggerManager` 决定对一个活跃的 `stream_id` 发起对话时,它会调用 `chat_manager.get_stream(stream_id)`
2. **返回缓存实例**: `ChatManager` 会直接从内存中返回缓存的 `ChatStream` 实例。
3. **传递丰富上下文**: 这个实例中包含了**完整的、包含近期对话历史**的 `stream_context`
4. **智能决策**: `TriggerManager` 将这个**充满信息**的上下文派发给 `InitiationPlanner``Planner` 因此可以优先使用“记忆驱动”等高级策略,生成与前文高度相关的话题,使对话显得自然、连贯。
### 冷启动流程 (Cold Start - “唤醒”非活跃聊天)
针对在白名单中,但当前未加载到内存的私聊。
**核心方法:** `ChatManager.get_or_create_stream(platform, user_info, group_info)`
**唤醒流程:**
1. `ProactiveTriggerManager` 在主循环中识别到一个需要“冷启动”的私聊 `stream_id`
2. `TriggerManager` 构造出必要的 `UserInfo` 对象。
3. 它调用 `get_chat_manager()`,然后执行核心的唤醒调用:
```python
# (伪代码)
chat_stream = await chat_manager.get_or_create_stream(...)
```
4. 此调用会从数据库加载或全新创建一个 `ChatStream` 实例,该实例内部会自动创建一个**不包含任何历史消息的空上下文**。
5. `TriggerManager` 将这个**空的 `StreamContext`** 连同 `ColdStartInitiationEvent` 事件一同派发出去,以触发通用的问候语。
此双轨制流程无需修改任何核心系统代码,仅通过合理调用现有接口即可实现,保证了方案的稳定性和兼容性。
---
这份经过强化的设计文档详细说明了配置文件的修改方案、新插件的内部架构以及与核心系统的交互模式。请您审阅。如果这份蓝图符合您的预期,我们就可以准备将此计划交付实施。
另外附加:我计划在 InitiationPlanner 中实现一个策略选择系统。每次被 TriggerManager 触发时,它会评估多种“主动聊天策略”的“适宜度分数”,然后选择分数最高的策略来执行。
以下是我初步设计的几种策略:
ColdStartGreetingStrategy (冷启动问候策略)
触发条件:仅在 TriggerManager 派发 ColdStartInitiationEvent 事件时触发。
核心逻辑:生成一句通用的、友好的问候语,比如“你好呀!”或者“最近怎么样?”。这是为了“唤醒”那些很久没聊天的私聊对象。
适宜度分数:固定高分(例如 1.0),确保在冷启动时优先执行。
MemoryDrivenStrategy (记忆驱动策略)
触发条件:常规触发 (ProactiveInitiationEvent),且当前聊天流的上下文不为空。
核心逻辑:
查询 MemoryManager获取关于当前聊天对象的长期记忆或近期摘要。
查询 ContextManager分析最近的几条对话寻找可以延续的话题。
利用 LLM 生成一个与上下文或记忆相关的话题。例如:“我们上次聊到的那个项目,后来进展如何了?”
适宜度分数计算 (借鉴AFC)
context_relevance_score (上下文相关性):上下文越丰富、越接近现在,分数越高。
relationship_score (关系分):从 RelationshipManager 获取,关系越好,越适合深入聊记忆话题。
final_score = (context_relevance_score * 权重) + (relationship_score * 权重)
TaskDrivenStrategy (任务/日程驱动策略)
触发条件:常规触发。
核心逻辑:
查询 ScheduleManager 或 MonthlyPlanManager看看今天或最近有没有“待办事项”或“计划”。
如果有,可以围绕这个任务发起对话。例如:“我看到日程表上说今天要去图书馆,准备好了吗?”
适宜度分数计算:
task_urgency_score (任务紧急度):任务越紧急,分数越高。
task_relevance_score (任务相关度):如果任务与当前聊天对象有关,分数更高。
final_score = (task_urgency_score * 权重) + (task_relevance_score * 权重)
GenericTopicStrategy (通用话题策略)
触发条件:作为所有其他策略都无法执行时的“兜底”策略。
核心逻辑:从一个预设的话题库(或者让 LLM 随机生成)中挑选一个通用的话题,比如“今天天气不错,适合出门散步呢”或者“最近有什么有趣的新闻吗?”。
适宜度分数:固定低分(例如 0.1),确保它是最后的选择。

View File

@@ -1,337 +0,0 @@
import time
from datetime import datetime
from typing import Any
import orjson
from src.chat.utils.chat_message_builder import build_readable_actions, get_actions_by_timestamp_with_chat
from src.chat.utils.prompt import Prompt
from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.mood.mood_manager import mood_manager
from src.person_info.person_info import get_person_info_manager
from src.plugin_system.apis import (
chat_api,
database_api,
generator_api,
llm_api,
message_api,
person_api,
schedule_api,
send_api,
)
from .prompts import DECISION_PROMPT, PLAN_PROMPT
logger = get_logger(__name__)
class ProactiveThinkerExecutor:
"""
主动思考执行器 V2
- 统一执行入口
- 引入决策模块,判断是否及如何发起对话
- 结合人设、日程、关系信息生成更具情境的对话
"""
def __init__(self):
"""
初始化 ProactiveThinkerExecutor 实例。
目前无需初始化操作。
"""
pass
async def execute(self, stream_id: str, start_mode: str = "wake_up"):
"""
统一执行入口
Args:
stream_id: 聊天流ID
start_mode: 启动模式, 'cold_start''wake_up'
"""
logger.info(f"开始为聊天流 {stream_id} 执行主动思考,模式: {start_mode}")
# 1. 信息收集
context = await self._gather_context(stream_id)
if not context:
return
# 2. 决策阶段
decision_result = await self._make_decision(context, start_mode)
if not decision_result or not decision_result.get("should_reply"):
reason = decision_result.get("reason", "未提供") if decision_result else "决策过程返回None"
logger.info(f"决策结果为:不回复。原因: {reason}")
await database_api.store_action_info(
chat_stream=self._get_stream_from_id(stream_id),
action_name="proactive_decision",
action_prompt_display=f"主动思考决定不回复,原因: {reason}",
action_done=True,
action_data=decision_result,
)
return
# 3. 规划与执行阶段
topic = decision_result.get("topic", "打个招呼")
reason = decision_result.get("reason", "")
await database_api.store_action_info(
chat_stream=self._get_stream_from_id(stream_id),
action_name="proactive_decision",
action_prompt_display=f"主动思考决定回复,原因: {reason},话题:{topic}",
action_done=True,
action_data=decision_result,
)
logger.info(f"决策结果为:回复。话题: {topic}")
# 根据聊天类型构建特定上下文
if context["chat_type"] == "private":
user_info = context["user_info"]
relationship = context["relationship"]
target_user_or_group = f"你的朋友 '{user_info.user_nickname}'"
context_specific_block = f"""
1. **你的日程**:
{context["schedule_context"]}
2. **你和Ta的关系**:
- 详细印象: {relationship["impression"]}
- 好感度: {relationship["attitude"]}/100
3. **最近的聊天摘要**:
{context["recent_chat_history"]}
4. **你最近的相关动作**:
{context["action_history_context"]}
"""
else: # group
group_info = context["group_info"]
target_user_or_group = f"群聊 '{group_info['group_name']}'"
context_specific_block = f"""
1. **你的日程**:
{context["schedule_context"]}
2. **群聊信息**:
- 群名称: {group_info["group_name"]}
3. **最近的聊天摘要**:
{context["recent_chat_history"]}
4. **你最近的相关动作**:
{context["action_history_context"]}
"""
plan_prompt = PLAN_PROMPT.format(
bot_nickname=global_config.bot.nickname,
persona_core=context["persona"]["core"],
persona_side=context["persona"]["side"],
identity=context["persona"]["identity"],
current_time=context["current_time"],
target_user_or_group=target_user_or_group,
reason=reason,
topic=topic,
context_specific_block=context_specific_block,
mood_state=context["mood_state"],
)
if global_config.debug.show_prompt:
logger.info(f"主动思考回复器原始提示词:{plan_prompt}")
is_success, response, _, _ = await llm_api.generate_with_model(
prompt=plan_prompt, model_config=model_config.model_task_config.replyer
)
if is_success and response:
stream = self._get_stream_from_id(stream_id)
if stream:
# 使用消息分割器处理并发送消息
reply_set = generator_api.process_human_text(response, enable_splitter=True, enable_chinese_typo=False)
for reply_type, content in reply_set:
if reply_type == "text":
await send_api.text_to_stream(stream_id=stream.stream_id, text=content)
else:
logger.warning(f"无法发送消息,因为找不到 stream_id 为 {stream_id} 的聊天流")
def _get_stream_from_id(self, stream_id: str):
"""
根据 stream_id 解析并获取对应的聊天流对象。
Args:
stream_id: 聊天流的唯一标识符,格式为 "platform:chat_id:stream_type"
Returns:
对应的 ChatStream 对象,如果解析失败或找不到则返回 None。
"""
try:
platform, chat_id, stream_type = stream_id.split(":")
if stream_type == "private":
return chat_api.ChatManager.get_private_stream_by_user_id(platform=platform, user_id=chat_id)
elif stream_type == "group":
return chat_api.ChatManager.get_group_stream_by_group_id(platform=platform, group_id=chat_id)
except Exception as e:
logger.error(f"获取 stream_id ({stream_id}) 失败: {e}")
return None
async def _gather_context(self, stream_id: str) -> dict[str, Any] | None:
"""
收集构建决策和规划提示词所需的所有上下文信息。
此函数会根据聊天流是私聊还是群聊,收集不同的信息,
包括但不限于日程、聊天历史、人设、关系信息等。
Args:
stream_id: 聊天流ID。
Returns:
一个包含所有上下文信息的字典,如果找不到聊天流则返回 None。
"""
stream = self._get_stream_from_id(stream_id)
if not stream:
logger.warning(f"无法找到 stream_id 为 {stream_id} 的聊天流")
return None
# 1. 收集通用信息 (日程, 聊天历史, 动作历史)
schedules = await schedule_api.ScheduleAPI.get_today_schedule()
schedule_context = (
"\n".join([f"- {s.get('time_range', '未知时间')}: {s.get('activity', '未知活动')}" for s in schedules])
if schedules
else "今天没有日程安排。"
)
recent_messages = await message_api.get_recent_messages(
stream.stream_id, limit=50, limit_mode="latest", hours=12
)
recent_chat_history = (
await message_api.build_readable_messages_to_str(recent_messages) if recent_messages else ""
)
action_history_list = await get_actions_by_timestamp_with_chat(
chat_id=stream.stream_id,
timestamp_start=time.time() - 3600 * 24, # 过去24小时
timestamp_end=time.time(),
limit=7,
)
action_history_context = build_readable_actions(actions=action_history_list)
# 2. 构建基础上下文
mood_state = "暂时没有"
if global_config.mood.enable_mood:
try:
mood_state = mood_manager.get_mood_by_chat_id(stream.stream_id).mood_state
except Exception as e:
logger.error(f"获取情绪失败,原因:{e}")
base_context = {
"schedule_context": schedule_context,
"recent_chat_history": recent_chat_history,
"action_history_context": action_history_context,
"mood_state": mood_state,
"persona": {
"core": global_config.personality.personality_core,
"side": global_config.personality.personality_side,
"identity": global_config.personality.identity,
},
"current_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
# 3. 根据聊天类型补充特定上下文
if stream.group_info: # 群聊场景
base_context.update(
{
"chat_type": "group",
"group_info": {"group_name": stream.group_info.group_name, "group_id": stream.group_info.group_id},
}
)
return base_context
elif stream.user_info: # 私聊场景
user_info = stream.user_info
if not user_info.platform or not user_info.user_id:
logger.warning(f"Stream {stream_id} 的 user_info 不完整")
return None
person_id = person_api.get_person_id(user_info.platform, int(user_info.user_id))
person_info_manager = get_person_info_manager()
person_info = await person_info_manager.get_values(person_id, ["user_id", "platform", "person_name"])
cross_context_block = await Prompt.build_cross_context(stream.stream_id, "s4u", person_info)
# 获取关系信息
short_impression = await person_info_manager.get_value(person_id, "short_impression") or ""
impression = await person_info_manager.get_value(person_id, "impression") or ""
attitude = await person_info_manager.get_value(person_id, "attitude") or 50
base_context.update(
{
"chat_type": "private",
"person_id": person_id,
"user_info": user_info,
"cross_context_block": cross_context_block,
"relationship": {
"short_impression": short_impression,
"impression": impression,
"attitude": attitude,
},
}
)
return base_context
else:
logger.warning(f"Stream {stream_id} 既没有 group_info 也没有 user_info")
return None
async def _make_decision(self, context: dict[str, Any], start_mode: str) -> dict[str, Any] | None:
"""
调用 LLM 进行决策,判断是否应该主动发起对话,以及聊什么话题。
"""
if context["chat_type"] not in ["private", "group"]:
return {"should_reply": False, "reason": "未知的聊天类型"}
# 根据聊天类型构建特定上下文
if context["chat_type"] == "private":
user_info = context["user_info"]
relationship = context["relationship"]
target_user_or_group = f"用户 '{user_info.user_nickname}'"
context_specific_block = f"""
1. **启动模式**: {start_mode} ({"初次见面/很久未见" if start_mode == "cold_start" else "日常唤醒"})
2. **你的日程**:
{context["schedule_context"]}
3. **你和Ta的关系**:
- 简短印象: {relationship["short_impression"]}
- 详细印象: {relationship["impression"]}
- 好感度: {relationship["attitude"]}/100
4. **和Ta在别处的讨论摘要**:
{context["cross_context_block"]}
5. **最近的聊天摘要**:
{context["recent_chat_history"]}
"""
else: # group
group_info = context["group_info"]
target_user_or_group = f"群聊 '{group_info['group_name']}'"
context_specific_block = f"""
1. **启动模式**: {start_mode} ({"首次加入/很久未发言" if start_mode == "cold_start" else "日常唤醒"})
2. **你的日程**:
{context["schedule_context"]}
3. **群聊信息**:
- 群名称: {group_info["group_name"]}
4. **最近的聊天摘要**:
{context["recent_chat_history"]}
"""
prompt = DECISION_PROMPT.format(
bot_nickname=global_config.bot.nickname,
persona_core=context["persona"]["core"],
persona_side=context["persona"]["side"],
identity=context["persona"]["identity"],
mood_state=context["mood_state"],
action_history_context=context["action_history_context"],
current_time=context["current_time"],
target_user_or_group=target_user_or_group,
context_specific_block=context_specific_block,
)
if global_config.debug.show_prompt:
logger.info(f"主动思考决策器原始提示词:{prompt}")
is_success, response, _, _ = await llm_api.generate_with_model(
prompt=prompt, model_config=model_config.model_task_config.utils
)
if not is_success:
return {"should_reply": False, "reason": "决策模型生成失败"}
try:
if global_config.debug.show_prompt:
logger.info(f"主动思考决策器响应:{response}")
decision = orjson.loads(response)
return decision
except orjson.JSONDecodeError:
logger.error(f"决策LLM返回的JSON格式无效: {response}")
return {"should_reply": False, "reason": "决策模型返回格式错误"}

View File

@@ -1,97 +0,0 @@
from src.chat.utils.prompt import Prompt
# =============================================================================
# 决策阶段 (Decision Phase)
# =============================================================================
DECISION_PROMPT = Prompt(
name="proactive_thinker_decision",
template="""
# 角色
你的名字是{bot_nickname},你的人设如下:
- 核心人设: {persona_core}
- 侧面人设: {persona_side}
- 身份: {identity}
你的当前情绪状态是: {mood_state}
# 你最近的相关决策历史 (供参考)
{action_history_context}
# 任务
现在是 {current_time},你需要根据当前的情境,决定是否要主动向{target_user_or_group}发起对话。
# 情境分析
{context_specific_block}
# 决策目标
你的最终目标是根据你的角色和当前情境,做出一个最符合人类社交直觉的决策,以求:
- **(私聊)深化关系**: 通过展现你的关心、记忆和个性来拉近与对方的距离。
- **(群聊)活跃气氛**: 提出能引起大家兴趣的话题,促进群聊的互动。
- **提供价值**: 你的出现应该是有意义的,无论是情感上的温暖,还是信息上的帮助。
- **保持自然**: 避免任何看起来像机器人或骚扰的行为。
# 决策指令
请综合以上所有信息以稳定、真实、拟人的方式做出决策。你的决策需要以JSON格式输出包含以下字段
- `should_reply`: bool, 是否应该发起对话。
- `topic`: str, 如果 `should_reply` 为 true你打算聊什么话题
- `reason`: str, 做出此决策的简要理由,需体现你对上述目标的考量。
# 决策流程与核心原则
1. **检查对话状态**:
- **最后发言者**: 查看【最近的聊天摘要】。如果最后一条消息是你发的,且对方尚未回复,**通常应选择不回复**。这是最重要的原则,以避免打扰。
- **例外**: 只有在等待时间足够长(例如超过数小时),或者你有非常重要且有时效性的新话题时,才考虑再次发言。
- **无人发言**: 如果最近的聊天记录里只有你一个人在说话,**绝对不要回复**,以防刷屏。
2. **寻找话题切入点 (如果可以回复)**:
- **强关联优先**: 优先从【情境分析】中寻找最自然、最相关的话题。顺序建议:`最近的聊天摘要` > `你和Ta的关系` > `你的日程`。
- **展现个性**: 结合你的【人设】和【情绪】,思考你会如何看待这些情境信息,并从中找到话题。
- **备选方案**: 如果实在没有强关联的话题,可以发起一个简单的日常问候。
3. **最终决策**:
- **权衡频率**: 查看【你最近的相关决策历史】。如果你在短时间内已经主动发起过多次对话,也应倾向于**不回复**,保持一定的社交距离。
- **质量胜于数量**: 宁可错过一次普通的互动机会,也不要进行一次尴尬或生硬的对话。
---
请输出你的决策:
"""
)
# =============================================================================
# 回复规划阶段 (Plan Phase)
# =============================================================================
PLAN_PROMPT = Prompt(
name="proactive_thinker_plan",
template="""
# 角色
你的名字是{bot_nickname},你的人设如下:
- 核心人设: {persona_core}
- 侧面人设: {persona_side}
- 身份: {identity}
# 任务
现在是 {current_time},你需要主动向{target_user_or_group}发起对话。
# 决策上下文
- **决策理由**: {reason}
# 情境分析
{context_specific_block}
# 对话指引
- 你决定和Ta聊聊关于“{topic}”的话题。
- **对话风格**:
- **自然开场**: 你可以根据话题和情境,选择最自然的开场方式。可以直接切入话题(如果话题关联性很强),也可以先用一句简单的问候作为过渡。**不要总是使用同一种开场白**。
- **融合情境**: 将【情境分析】中的信息巧妙地融入到对话中,让你的话语听起来更真实、更有依据。
- **符合人设**: 你的语气、用词、甚至表情符号的使用,都应该完全符合你的【角色】设定和当前【情绪】({mood_state})。
# 输出要求
- **简洁**: 不要输出任何多余内容如前缀、后缀、冒号、引号、at/@等)。
- **原创**: 不要重复之前的内容,即使意思相近也不行。
- **直接**: 只输出最终的回复文本本身。
- **风格**: 回复需简短、完整且口语化。
现在,你说:
"""
)

View File

@@ -1,5 +1,5 @@
[inner]
version = "7.5.2"
version = "7.5.3"
#----以下是给开发人员阅读的如果你只是部署了MoFox-Bot不需要阅读----
#如果你想要修改配置文件请递增version的值
@@ -568,30 +568,67 @@ relationship_tracking_interval_min = 300 # 关系追踪最小间隔时间(秒
relationship_tracking_cooldown_hours = 1.0 # 同一用户关系追踪冷却时间(小时)
[proactive_thinking] # 主动思考(主动发起对话)功能配置
# --- 总开关 ---
enable = true # 是否启用主动发起对话功能
# 详细配置说明请参考docs/proactive_thinking_config_guide.md
# --- 触发时机 ---
# 基础触发间隔AI会围绕这个时间点主动发起对话
interval = 1500 # 默认25分钟
# 间隔随机化标准差让触发时间更自然。设为0则为固定间隔。
interval_sigma = 120
# 每日活跃度调整,格式:[["", "HH:MM,factor", ...], ["stream_id", ...]]
# factor > 1.0 会缩短思考间隔更活跃factor < 1.0 会延长间隔。
talk_frequency_adjust = [["", "8:00,1", "12:00,1.2", "18:00,1.5", "01:00,0.6"]]
# --- 总开关 ---
enable = false # 是否启用主动发起对话功能
# --- 间隔配置 ---
base_interval = 1800 # 基础触发间隔默认30分钟
min_interval = 600 # 最小触发间隔默认10分钟
max_interval = 7200 # 最大触发间隔默认2小时
# 动态调整配置
use_interest_score = true # 是否根据兴趣分数动态调整间隔
interest_score_factor = 2.0 # 兴趣分数影响因子1.0-3.0
# 公式: interval = base_interval * (interest_score_factor - interest_score)
# 例如: interest_score=0.8, factor=2.0 -> interval = 1800 * 1.2 = 2160秒(36分钟)
# --- 黑白名单配置 ---
whitelist_mode = false # 是否启用白名单模式(启用后只对白名单中的聊天流生效)
blacklist_mode = false # 是否启用黑名单模式(启用后排除黑名单中的聊天流)
# 白名单配置(示例格式)
whitelist_private = [] # 私聊白名单,格式: ["qq:12345:private"]
whitelist_group = [] # 群聊白名单,格式: ["qq:123456:group"]
# 黑名单配置(示例格式)
blacklist_private = [] # 私聊黑名单,格式: ["qq:12345:private"]
blacklist_group = [] # 群聊黑名单,格式: ["qq:999999:group"]
# --- 作用范围 ---
enable_in_private = true # 是否允许在私聊中主动发起对话
enable_in_group = true # 是否允许在群聊中主动发起对话
# 私聊白名单,为空则对所有私聊生效
# 格式: ["platform:user_id", ...] e.g., ["qq:123456"]
enabled_private_chats = []
# 群聊白名单,为空则对所有群聊生效
# 格式: ["platform:group_id", ...] e.g., ["qq:7891011"]
enabled_group_chats = []
# --- 冷启动配置 (针对私聊) ---
# 对于白名单中不活跃的私聊,是否允许进行一次“冷启动”问候
enable_cold_start = true
# 冷启动后,该私聊的下一次主动思考需要等待的最小时间(秒)
cold_start_cooldown = 86400 # 默认24小时
# --- 兴趣分数阈值 ---
min_interest_score = 0.0 # 最低兴趣分数阈值,低于此值不会主动思考
max_interest_score = 1.0 # 最高兴趣分数阈值,高于此值不会主动思考
# --- 时间策略配置 ---
enable_time_strategy = false # 是否启用时间策略(根据时段调整频率)
quiet_hours_start = "00:00" # 安静时段开始时间,格式: "HH:MM"
quiet_hours_end = "07:00" # 安静时段结束时间,格式: "HH:MM"
active_hours_multiplier = 0.7 # 活跃时段间隔倍数,<1表示更频繁>1表示更稀疏
# --- 冷却与限制 ---
reply_reset_enabled = true # bot回复后是否重置定时器避免回复后立即又主动发言
topic_throw_cooldown = 3600 # 抛出话题后的冷却时间(秒),期间暂停主动思考
max_daily_proactive = 0 # 每个聊天流每天最多主动发言次数0表示不限制
# --- 决策权重配置 ---
do_nothing_weight = 0.4 # do_nothing动作的基础权重
simple_bubble_weight = 0.3 # simple_bubble动作的基础权重
throw_topic_weight = 0.3 # throw_topic动作的基础权重
# --- 调试与监控 ---
enable_statistics = true # 是否启用统计功能(记录触发次数、决策分布等)
log_decisions = false # 是否记录每次决策的详细日志(用于调试)
# --- 兼容旧配置(已废弃,建议删除) ---
interval = 1800 # [已废弃] 请使用 base_interval
interval_sigma = 120 # [已废弃] 随机化功能已移除
talk_frequency_adjust = [] # [已废弃] 请使用 enable_time_strategy 和相关配置
enabled_private_chats = [] # [已废弃] 请使用 whitelist_private
enabled_group_chats = [] # [已废弃] 请使用 whitelist_group
enable_cold_start = false # [已废弃] 冷启动功能已移除
cold_start_cooldown = 86400 # [已废弃] 冷启动功能已移除