diff --git a/src/chat/message_receive/uni_message_sender.py b/src/chat/message_receive/uni_message_sender.py index bd23402e2..9e5a5054a 100644 --- a/src/chat/message_receive/uni_message_sender.py +++ b/src/chat/message_receive/uni_message_sender.py @@ -23,6 +23,21 @@ async def send_message(message: MessageSending, show_log=True) -> bool: await get_global_api().send_message(message) if show_log: logger.info(f"已将消息 '{message_preview}' 发往平台'{message.message_info.platform}'") + + # 触发 AFTER_SEND 事件 + try: + from src.plugin_system.core.event_manager import event_manager + from src.plugin_system.base.component_types import EventType + + if message.chat_stream: + await event_manager.trigger_event( + EventType.AFTER_SEND, + kwargs={"stream_id": message.chat_stream.stream_id, "message": message}, + trigger_source="SYSTEM", + ) + except Exception as event_error: + logger.error(f"触发 AFTER_SEND 事件时出错: {event_error}") + return True except Exception as e: diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 3f7622115..cc9885b8c 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -757,7 +757,62 @@ class ProactiveThinkingConfig(ValidatedConfigBase): ) # --- 冷启动配置 (针对私聊) --- - enable_cold_start: bool = Field(default=True, description="对于白名单中不活跃的私聊,是否允许进行一次“冷启动”问候") + enable_cold_start: bool = Field(default=True, description='对于白名单中不活跃的私聊,是否允许进行一次"冷启动"问候') cold_start_cooldown: int = Field( default=86400, description="冷启动后,该私聊的下一次主动思考需要等待的最小时间(秒)" ) + + # --- 新增:间隔配置 --- + base_interval: int = Field(default=1800, ge=60, description="基础触发间隔(秒),默认30分钟") + min_interval: int = Field(default=600, ge=60, description="最小触发间隔(秒),默认10分钟。兴趣分数高时会接近此值") + max_interval: int = Field(default=7200, ge=60, description="最大触发间隔(秒),默认2小时。兴趣分数低时会接近此值") + + # --- 新增:动态调整配置 --- + use_interest_score: bool = Field(default=True, description="是否根据兴趣分数动态调整间隔。关闭则使用固定base_interval") + interest_score_factor: float = Field(default=2.0, ge=1.0, le=3.0, description="兴趣分数影响因子。公式: interval = base * (factor - score)") + + # --- 新增:黑白名单配置 --- + whitelist_mode: bool = Field(default=False, description="是否启用白名单模式。启用后只对白名单中的聊天流生效") + blacklist_mode: bool = Field(default=False, description="是否启用黑名单模式。启用后排除黑名单中的聊天流") + + whitelist_private: list[str] = Field( + default_factory=list, + description='私聊白名单,格式: ["platform:user_id:private", "qq:12345:private"]' + ) + whitelist_group: list[str] = Field( + default_factory=list, + description='群聊白名单,格式: ["platform:group_id:group", "qq:123456:group"]' + ) + + blacklist_private: list[str] = Field( + default_factory=list, + description='私聊黑名单,格式: ["platform:user_id:private", "qq:12345:private"]' + ) + blacklist_group: list[str] = Field( + default_factory=list, + description='群聊黑名单,格式: ["platform:group_id:group", "qq:123456:group"]' + ) + + # --- 新增:兴趣分数阈值 --- + min_interest_score: float = Field(default=0.0, ge=0.0, le=1.0, description="最低兴趣分数阈值,低于此值不会主动思考") + max_interest_score: float = Field(default=1.0, ge=0.0, le=1.0, description="最高兴趣分数阈值,高于此值不会主动思考(用于限制过度活跃)") + + # --- 新增:时间策略配置 --- + enable_time_strategy: bool = Field(default=False, description="是否启用时间策略(根据时段调整频率)") + quiet_hours_start: str = Field(default="00:00", description='安静时段开始时间,格式: "HH:MM"') + quiet_hours_end: str = Field(default="07:00", description='安静时段结束时间,格式: "HH:MM"') + active_hours_multiplier: float = Field(default=0.7, ge=0.1, le=2.0, description="活跃时段间隔倍数,<1表示更频繁,>1表示更稀疏") + + # --- 新增:冷却与限制 --- + reply_reset_enabled: bool = Field(default=True, description="bot回复后是否重置定时器(避免回复后立即又主动发言)") + topic_throw_cooldown: int = Field(default=3600, ge=0, description="抛出话题后的冷却时间(秒),期间暂停主动思考") + max_daily_proactive: int = Field(default=0, ge=0, description="每个聊天流每天最多主动发言次数,0表示不限制") + + # --- 新增:决策权重配置 --- + do_nothing_weight: float = Field(default=0.4, ge=0.0, le=1.0, description="do_nothing动作的基础权重") + simple_bubble_weight: float = Field(default=0.3, ge=0.0, le=1.0, description="simple_bubble动作的基础权重") + throw_topic_weight: float = Field(default=0.3, ge=0.0, le=1.0, description="throw_topic动作的基础权重") + + # --- 新增:调试与监控 --- + enable_statistics: bool = Field(default=True, description="是否启用统计功能(记录触发次数、决策分布等)") + log_decisions: bool = Field(default=False, description="是否记录每次决策的详细日志(用于调试)") diff --git a/src/main.py b/src/main.py index 1400b3568..c23d887b3 100644 --- a/src/main.py +++ b/src/main.py @@ -422,6 +422,14 @@ MoFox_Bot(第三方修改版) except Exception as e: logger.error(f"注册API路由失败: {e}") + # 初始化统一调度器 + try: + from src.schedule.unified_scheduler import initialize_scheduler + + await initialize_scheduler() + except Exception as e: + logger.error(f"统一调度器初始化失败: {e}") + # 加载所有插件 plugin_manager.load_all_plugins() @@ -486,14 +494,6 @@ MoFox_Bot(第三方修改版) # 初始化计划相关组件 await self._init_planning_components() - # 初始化统一调度器 - try: - from src.schedule.unified_scheduler import initialize_scheduler - - await initialize_scheduler() - except Exception as e: - logger.error(f"统一调度器初始化失败: {e}") - # 触发启动事件 try: await event_manager.trigger_event(EventType.ON_START, permission_group="SYSTEM") diff --git a/src/plugins/built_in/affinity_flow_chatter/plugin.py b/src/plugins/built_in/affinity_flow_chatter/plugin.py index 6a8ee7fdb..9e5366f9a 100644 --- a/src/plugins/built_in/affinity_flow_chatter/plugin.py +++ b/src/plugins/built_in/affinity_flow_chatter/plugin.py @@ -68,4 +68,20 @@ class AffinityChatterPlugin(BasePlugin): except Exception as e: logger.error(f"加载 ChatStreamImpressionTool 时出错: {e}") + try: + # 延迟导入 ProactiveThinkingReplyHandler + from .proactive_thinking_event import ProactiveThinkingReplyHandler + + components.append((ProactiveThinkingReplyHandler.get_handler_info(), ProactiveThinkingReplyHandler)) + except Exception as e: + logger.error(f"加载 ProactiveThinkingReplyHandler 时出错: {e}") + + try: + # 延迟导入 ProactiveThinkingMessageHandler + from .proactive_thinking_event import ProactiveThinkingMessageHandler + + components.append((ProactiveThinkingMessageHandler.get_handler_info(), ProactiveThinkingMessageHandler)) + except Exception as e: + logger.error(f"加载 ProactiveThinkingMessageHandler 时出错: {e}") + return components diff --git a/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_event.py b/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_event.py new file mode 100644 index 000000000..029bcc22f --- /dev/null +++ b/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_event.py @@ -0,0 +1,146 @@ +""" +主动思考事件处理器 +监听bot的reply事件,在reply后重置对应聊天流的主动思考定时任务 +""" + +from src.common.logger import get_logger +from src.plugin_system import BaseEventHandler, EventType +from src.plugin_system.base.base_event import HandlerResult +from src.plugins.built_in.affinity_flow_chatter.proactive_thinking_scheduler import ( + proactive_thinking_scheduler, +) + +logger = get_logger("proactive_thinking_event") + + +class ProactiveThinkingReplyHandler(BaseEventHandler): + """Reply事件处理器 + + 当bot回复某个聊天流后: + 1. 如果该聊天流的主动思考被暂停(因为抛出了话题),则恢复它 + 2. 无论是否暂停,都重置定时任务,重新开始计时 + """ + + handler_name: str = "proactive_thinking_reply_handler" + handler_description: str = "监听reply事件,重置主动思考定时任务" + init_subscribe: list[EventType | str] = [EventType.AFTER_SEND] + + async def execute(self, kwargs: dict | None) -> HandlerResult: + """处理reply事件 + + Args: + kwargs: 事件参数,应包含 stream_id + + Returns: + HandlerResult: 处理结果 + """ + if not kwargs: + return HandlerResult(success=True, continue_process=True, message=None) + + stream_id = kwargs.get("stream_id") + if not stream_id: + logger.warning("Reply事件缺少stream_id参数") + return HandlerResult(success=True, continue_process=True, message=None) + + try: + from src.config.config import global_config + + # 检查是否启用reply重置 + if not global_config.proactive_thinking.reply_reset_enabled: + return HandlerResult(success=True, continue_process=True, message=None) + + # 检查是否被暂停 + was_paused = await proactive_thinking_scheduler.is_paused(stream_id) + + if was_paused: + logger.info(f"检测到reply事件,聊天流 {stream_id} 之前因抛出话题而暂停,现在恢复") + + # 重置定时任务(这会自动清除暂停标记并创建新任务) + success = await proactive_thinking_scheduler.schedule_proactive_thinking(stream_id) + + if success: + if was_paused: + logger.info(f"聊天流 {stream_id} 的主动思考已恢复并重置") + else: + logger.debug(f"聊天流 {stream_id} 的主动思考定时任务已重置") + else: + logger.warning(f"重置聊天流 {stream_id} 的主动思考任务失败") + + except Exception as e: + logger.error(f"处理reply事件时出错: {e}", exc_info=True) + + # 总是继续处理其他handler + return HandlerResult(success=True, continue_process=True, message=None) + + +class ProactiveThinkingMessageHandler(BaseEventHandler): + """消息事件处理器 + + 当收到消息时,如果该聊天流还没有主动思考任务,则创建一个 + 这样可以确保新的聊天流也能获得主动思考功能 + """ + + handler_name: str = "proactive_thinking_message_handler" + handler_description: str = "监听消息事件,为新聊天流创建主动思考任务" + init_subscribe: list[EventType | str] = [EventType.ON_MESSAGE] + + async def execute(self, kwargs: dict | None) -> HandlerResult: + """处理消息事件 + + Args: + kwargs: 事件参数,格式为 {"message": MessageRecv} + + Returns: + HandlerResult: 处理结果 + """ + if not kwargs: + return HandlerResult(success=True, continue_process=True, message=None) + + # 从 kwargs 中获取 MessageRecv 对象 + message = kwargs.get("message") + if not message or not hasattr(message, "chat_stream"): + return HandlerResult(success=True, continue_process=True, message=None) + + # 从 chat_stream 获取 stream_id + chat_stream = message.chat_stream + if not chat_stream or not hasattr(chat_stream, "stream_id"): + return HandlerResult(success=True, continue_process=True, message=None) + + stream_id = chat_stream.stream_id + + try: + from src.config.config import global_config + + # 检查是否启用主动思考 + if not global_config.proactive_thinking.enable: + return HandlerResult(success=True, continue_process=True, message=None) + + # 检查该聊天流是否已经有任务 + task_info = await proactive_thinking_scheduler.get_task_info(stream_id) + if task_info: + # 已经有任务,不需要创建 + return HandlerResult(success=True, continue_process=True, message=None) + + # 从 message_info 获取平台和聊天ID信息 + message_info = message.message_info + platform = message_info.platform + is_group = message_info.group_info is not None + chat_id = message_info.group_info.group_id if is_group else message_info.user_info.user_id # type: ignore + + # 构造配置字符串 + stream_config = f"{platform}:{chat_id}:{'group' if is_group else 'private'}" + + # 检查黑白名单 + if not proactive_thinking_scheduler._check_whitelist_blacklist(stream_config): + return HandlerResult(success=True, continue_process=True, message=None) + + # 创建主动思考任务 + success = await proactive_thinking_scheduler.schedule_proactive_thinking(stream_id) + if success: + logger.info(f"为新聊天流 {stream_id} 创建了主动思考任务") + + except Exception as e: + logger.error(f"处理消息事件时出错: {e}", exc_info=True) + + # 总是继续处理其他handler + return HandlerResult(success=True, continue_process=True, message=None) diff --git a/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_executor.py b/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_executor.py new file mode 100644 index 000000000..d014f47b0 --- /dev/null +++ b/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_executor.py @@ -0,0 +1,538 @@ +""" +主动思考执行器 +当定时任务触发时,负责搜集信息、调用LLM决策、并根据决策生成回复 +""" + +import json +from datetime import datetime +from typing import Any, Literal, Optional + +from sqlalchemy import select + +from src.chat.express.expression_learner import expression_learner_manager +from src.chat.express.expression_selector import expression_selector +from src.common.database.sqlalchemy_database_api import get_db_session +from src.common.database.sqlalchemy_models import ChatStreams +from src.common.logger import get_logger +from src.config.config import global_config, model_config +from src.individuality.individuality import Individuality +from src.llm_models.utils_model import LLMRequest +from src.plugin_system.apis import chat_api, message_api, send_api + +logger = get_logger("proactive_thinking_executor") + + +class ProactiveThinkingPlanner: + """主动思考规划器 + + 负责: + 1. 搜集信息(聊天流印象、话题关键词、历史聊天记录) + 2. 调用LLM决策:什么都不做/简单冒泡/抛出话题 + 3. 根据决策生成回复内容 + """ + + def __init__(self): + """初始化规划器""" + try: + self.decision_llm = LLMRequest( + model_set=model_config.model_task_config.utils, + request_type="proactive_thinking_decision" + ) + self.reply_llm = LLMRequest( + model_set=model_config.model_task_config.replyer, + request_type="proactive_thinking_reply" + ) + except Exception as e: + logger.error(f"初始化LLM失败: {e}") + self.decision_llm = None + self.reply_llm = None + + async def gather_context(self, stream_id: str) -> Optional[dict[str, Any]]: + """搜集聊天流的上下文信息 + + Args: + stream_id: 聊天流ID + + Returns: + dict: 包含所有上下文信息的字典,失败返回None + """ + try: + # 1. 获取聊天流印象数据 + stream_data = await self._get_stream_impression(stream_id) + if not stream_data: + logger.warning(f"无法获取聊天流 {stream_id} 的印象数据") + return None + + # 2. 获取最近的聊天记录 + recent_messages = await message_api.get_recent_messages( + chat_id=stream_id, + limit=20, + limit_mode="latest", + hours=24 + ) + + recent_chat_history = "" + if recent_messages: + recent_chat_history = await message_api.build_readable_messages_to_str(recent_messages) + + # 3. 获取bot人设 + individuality = Individuality() + bot_personality = await individuality.get_personality_block() + + # 4. 构建上下文 + context = { + "stream_id": stream_id, + "stream_name": stream_data.get("stream_name", "未知"), + "stream_impression": stream_data.get("stream_impression_text", "暂无印象"), + "chat_style": stream_data.get("stream_chat_style", "未知"), + "topic_keywords": stream_data.get("stream_topic_keywords", ""), + "interest_score": stream_data.get("stream_interest_score", 0.5), + "recent_chat_history": recent_chat_history or "暂无最近聊天记录", + "bot_personality": bot_personality, + "current_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + + logger.debug(f"成功搜集聊天流 {stream_id} 的上下文信息") + return context + + except Exception as e: + logger.error(f"搜集上下文信息失败: {e}", exc_info=True) + return None + + async def _get_stream_impression(self, stream_id: str) -> Optional[dict[str, Any]]: + """从数据库获取聊天流印象数据""" + try: + async with get_db_session() as session: + stmt = select(ChatStreams).where(ChatStreams.stream_id == stream_id) + result = await session.execute(stmt) + stream = result.scalar_one_or_none() + + if not stream: + return None + + return { + "stream_name": stream.group_name or "私聊", + "stream_impression_text": stream.stream_impression_text or "", + "stream_chat_style": stream.stream_chat_style or "", + "stream_topic_keywords": stream.stream_topic_keywords or "", + "stream_interest_score": float(stream.stream_interest_score) if stream.stream_interest_score else 0.5, + } + + except Exception as e: + logger.error(f"获取聊天流印象失败: {e}") + return None + + async def make_decision( + self, context: dict[str, Any] + ) -> Optional[dict[str, Any]]: + """使用LLM进行决策 + + Args: + context: 上下文信息 + + Returns: + dict: 决策结果,包含: + - action: "do_nothing" | "simple_bubble" | "throw_topic" + - reasoning: 决策理由 + - topic: (可选) 如果是throw_topic,包含话题内容 + """ + if not self.decision_llm: + logger.error("决策LLM未初始化") + return None + + response = None + try: + decision_prompt = self._build_decision_prompt(context) + + if global_config.debug.show_prompt: + logger.info(f"决策提示词:\n{decision_prompt}") + + response, _ = await self.decision_llm.generate_response_async(prompt=decision_prompt) + + if not response: + logger.warning("LLM未返回有效响应") + return None + + # 清理并解析JSON响应 + cleaned_response = self._clean_json_response(response) + decision = json.loads(cleaned_response) + + logger.info( + f"决策结果: {decision.get('action', 'unknown')} - {decision.get('reasoning', '无理由')}" + ) + + return decision + + except json.JSONDecodeError as e: + logger.error(f"解析决策JSON失败: {e}") + if response: + logger.debug(f"原始响应: {response}") + return None + except Exception as e: + logger.error(f"决策过程失败: {e}", exc_info=True) + return None + + def _build_decision_prompt(self, context: dict[str, Any]) -> str: + """构建决策提示词""" + return f"""你是一个有着独特个性的AI助手。你的人设是: +{context['bot_personality']} + +现在是 {context['current_time']},你正在考虑是否要主动在 "{context['stream_name']}" 中说些什么。 + +【聊天环境信息】 +- 整体印象: {context['stream_impression']} +- 聊天风格: {context['chat_style']} +- 常见话题: {context['topic_keywords'] or '暂无'} +- 你的兴趣程度: {context['interest_score']:.2f}/1.0 + +【最近的聊天记录】 +{context['recent_chat_history']} + +请根据以上信息,决定你现在应该做什么: + +**选项1:什么都不做 (do_nothing)** +- 适用场景:现在可能是休息时间、工作时间,或者气氛不适合说话 +- 也可能是:最近聊天很活跃不需要你主动、没什么特别想说的、此时说话会显得突兀 + +**选项2:简单冒个泡 (simple_bubble)** +- 适用场景:群里有点冷清,你想引起注意或活跃气氛 +- 方式:简单问个好、发个表情、说句无关紧要的话,没有深意,就是刷个存在感 + +**选项3:抛出一个话题 (throw_topic)** +- 适用场景:历史消息中有未讨论完的话题、你有自己的想法、或者想深入聊某个主题 +- 方式:明确提出一个话题,希望得到回应和讨论 + +请以JSON格式回复你的决策: +{{ + "action": "do_nothing" | "simple_bubble" | "throw_topic", + "reasoning": "你的决策理由,说明为什么选择这个行动", + "topic": "(仅当action=throw_topic时填写)你想抛出的具体话题" +}} + +注意: +1. 如果最近聊天很活跃(不到1小时),倾向于选择 do_nothing +2. 如果你对这个环境兴趣不高(<0.4),倾向于选择 do_nothing 或 simple_bubble +3. 只有在真的有话题想聊时才选择 throw_topic +4. 符合你的人设,不要太过热情或冷淡 +""" + + async def generate_reply( + self, + context: dict[str, Any], + action: Literal["simple_bubble", "throw_topic"], + topic: Optional[str] = None + ) -> Optional[str]: + """生成回复内容 + + Args: + context: 上下文信息 + action: 动作类型 + topic: (可选) 话题内容,当action=throw_topic时必须提供 + + Returns: + str: 生成的回复文本,失败返回None + """ + if not self.reply_llm: + logger.error("回复LLM未初始化") + return None + + try: + reply_prompt = await self._build_reply_prompt(context, action, topic) + + if global_config.debug.show_prompt: + logger.info(f"回复提示词:\n{reply_prompt}") + + response, _ = await self.reply_llm.generate_response_async(prompt=reply_prompt) + + if not response: + logger.warning("LLM未返回有效回复") + return None + + logger.info(f"生成回复成功: {response[:50]}...") + return response.strip() + + except Exception as e: + logger.error(f"生成回复失败: {e}", exc_info=True) + return None + + async def _get_expression_habits(self, stream_id: str, chat_history: str) -> str: + """获取表达方式参考 + + Args: + stream_id: 聊天流ID + chat_history: 聊天历史 + + Returns: + str: 格式化的表达方式参考文本 + """ + try: + # 使用表达方式选择器获取合适的表达方式 + selected_expressions = await expression_selector.select_suitable_expressions( + chat_id=stream_id, + chat_history=chat_history, + target_message=None, # 主动思考没有target message + max_num=6, # 主动思考时使用较少的表达方式 + min_num=2 + ) + + if not selected_expressions: + return "" + + style_habits = [] + grammar_habits = [] + + for expr in selected_expressions: + if isinstance(expr, dict) and "situation" in expr and "style" in expr: + expr_type = expr.get("type", "style") + if expr_type == "grammar": + grammar_habits.append(f"当{expr['situation']}时,使用 {expr['style']}") + else: + style_habits.append(f"当{expr['situation']}时,使用 {expr['style']}") + + expression_block = "" + if style_habits or grammar_habits: + expression_block = "\n【表达方式参考】\n" + if style_habits: + expression_block += "语言习惯:\n" + "\n".join(style_habits) + "\n" + if grammar_habits: + expression_block += "句法特点:\n" + "\n".join(grammar_habits) + "\n" + expression_block += "注意:仅在情景合适时自然地使用这些表达,不要生硬套用。\n" + + return expression_block + + except Exception as e: + logger.warning(f"获取表达方式失败: {e}") + return "" + + async def _build_reply_prompt( + self, + context: dict[str, Any], + action: Literal["simple_bubble", "throw_topic"], + topic: Optional[str] + ) -> str: + """构建回复提示词""" + # 获取表达方式参考 + expression_habits = await self._get_expression_habits( + stream_id=context.get('stream_id', ''), + chat_history=context.get('recent_chat_history', '') + ) + + if action == "simple_bubble": + return f"""你是一个有着独特个性的AI助手。你的人设是: +{context['bot_personality']} + +现在是 {context['current_time']},你决定在 "{context['stream_name']}" 中简单冒个泡。 + +【聊天环境】 +- 整体印象: {context['stream_impression']} +- 聊天风格: {context['chat_style']} + +【最近的聊天记录】 +{context['recent_chat_history']} +{expression_habits} +请生成一条简短的消息,用于活跃气氛或刷存在感。要求: +1. 非常简短(5-15字) +2. 轻松随意,不要有明确的话题或问题 +3. 可以是:问候、表达心情、随口一句话 +4. 符合你的人设和当前聊天风格 +5. 如果有表达方式参考,在合适时自然使用 + +直接输出消息内容,不要解释:""" + + else: # throw_topic + return f"""你是一个有着独特个性的AI助手。你的人设是: +{context['bot_personality']} + +现在是 {context['current_time']},你决定在 "{context['stream_name']}" 中抛出一个话题。 + +【聊天环境】 +- 整体印象: {context['stream_impression']} +- 聊天风格: {context['chat_style']} +- 常见话题: {context['topic_keywords'] or '暂无'} + +【最近的聊天记录】 +{context['recent_chat_history']} + +【你想抛出的话题】 +{topic} +{expression_habits} +请根据这个话题生成一条消息,要求: +1. 明确提出话题,引导讨论 +2. 长度适中(20-50字) +3. 自然地引入话题,不要生硬 +4. 可以结合最近的聊天记录 +5. 符合你的人设和当前聊天风格 +6. 如果有表达方式参考,在合适时自然使用 + +直接输出消息内容,不要解释:""" + + def _clean_json_response(self, response: str) -> str: + """清理LLM响应中的JSON格式标记""" + import re + + cleaned = response.strip() + cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned, flags=re.MULTILINE | re.IGNORECASE) + cleaned = re.sub(r"\s*```$", "", cleaned, flags=re.MULTILINE) + + json_start = cleaned.find("{") + json_end = cleaned.rfind("}") + + if json_start != -1 and json_end != -1 and json_end > json_start: + cleaned = cleaned[json_start:json_end + 1] + + return cleaned.strip() + + +# 全局规划器实例 +_planner = ProactiveThinkingPlanner() + +# 统计数据 +_statistics: dict[str, dict[str, Any]] = {} + + +def _update_statistics(stream_id: str, action: str): + """更新统计数据 + + Args: + stream_id: 聊天流ID + action: 执行的动作 + """ + if stream_id not in _statistics: + _statistics[stream_id] = { + "total_executions": 0, + "do_nothing_count": 0, + "simple_bubble_count": 0, + "throw_topic_count": 0, + "last_execution_time": None, + } + + _statistics[stream_id]["total_executions"] += 1 + _statistics[stream_id][f"{action}_count"] += 1 + _statistics[stream_id]["last_execution_time"] = datetime.now().isoformat() + + +def get_statistics(stream_id: Optional[str] = None) -> dict[str, Any]: + """获取统计数据 + + Args: + stream_id: 聊天流ID,None表示获取所有统计 + + Returns: + 统计数据字典 + """ + if stream_id: + return _statistics.get(stream_id, {}) + return _statistics + + +async def execute_proactive_thinking(stream_id: str): + """执行主动思考(被调度器调用的回调函数) + + Args: + stream_id: 聊天流ID + """ + from src.config.config import global_config + from src.plugins.built_in.affinity_flow_chatter.proactive_thinking_scheduler import ( + proactive_thinking_scheduler, + ) + + config = global_config.proactive_thinking + + logger.info(f"开始为聊天流 {stream_id} 执行主动思考") + + try: + # 0. 前置检查 + # 检查是否在安静时段 + if proactive_thinking_scheduler._is_in_quiet_hours(): + logger.debug(f"当前在安静时段,跳过主动思考") + return + + # 检查每日限制 + if not proactive_thinking_scheduler._check_daily_limit(stream_id): + logger.info(f"聊天流 {stream_id} 今日主动发言次数已达上限") + return + + # 1. 搜集信息 + context = await _planner.gather_context(stream_id) + if not context: + logger.warning(f"无法搜集聊天流 {stream_id} 的上下文,跳过本次主动思考") + return + + # 检查兴趣分数阈值 + interest_score = context.get('interest_score', 0.5) + if not proactive_thinking_scheduler._check_interest_score_threshold(interest_score): + logger.info(f"聊天流 {stream_id} 兴趣分数不在阈值范围内") + return + + # 2. 进行决策 + decision = await _planner.make_decision(context) + if not decision: + logger.warning(f"决策失败,跳过本次主动思考") + return + + action = decision.get("action", "do_nothing") + reasoning = decision.get("reasoning", "无") + + # 记录决策日志 + if config.log_decisions: + logger.info(f"[决策详情] stream_id={stream_id}, action={action}, reasoning={reasoning}") + + # 3. 根据决策执行相应动作 + if action == "do_nothing": + logger.info(f"决策:什么都不做。理由:{reasoning}") + return + + elif action == "simple_bubble": + logger.info(f"决策:简单冒个泡。理由:{reasoning}") + + # 生成简单的消息 + reply = await _planner.generate_reply(context, "simple_bubble") + if reply: + await send_api.text_to_stream(stream_id=stream_id, text=reply) + logger.info(f"已发送冒泡消息到 {stream_id}") + + # 增加每日计数 + proactive_thinking_scheduler._increment_daily_count(stream_id) + + # 更新统计 + if config.enable_statistics: + _update_statistics(stream_id, action) + + # 冒泡后可以继续主动思考,不需要暂停 + + elif action == "throw_topic": + topic = decision.get("topic", "") + logger.info(f"决策:抛出话题。理由:{reasoning},话题:{topic}") + + if not topic: + logger.warning("选择了抛出话题但未提供话题内容,降级为冒泡") + reply = await _planner.generate_reply(context, "simple_bubble") + else: + # 生成基于话题的消息 + reply = await _planner.generate_reply(context, "throw_topic", topic) + + if reply: + await send_api.text_to_stream(stream_id=stream_id, text=reply) + logger.info(f"已发送话题消息到 {stream_id}") + + # 增加每日计数 + proactive_thinking_scheduler._increment_daily_count(stream_id) + + # 更新统计 + if config.enable_statistics: + _update_statistics(stream_id, action) + + # 抛出话题后暂停主动思考(如果配置了冷却时间) + if config.topic_throw_cooldown > 0: + await proactive_thinking_scheduler.pause_proactive_thinking(stream_id, reason="已抛出话题") + + # 设置定时恢复(在reply_reset_enabled关闭时使用) + if not config.reply_reset_enabled: + import asyncio + async def resume_after_cooldown(): + await asyncio.sleep(config.topic_throw_cooldown) + await proactive_thinking_scheduler.schedule_proactive_thinking(stream_id) + asyncio.create_task(resume_after_cooldown()) + + except Exception as e: + logger.error(f"执行主动思考失败: {e}", exc_info=True) diff --git a/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_scheduler.py b/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_scheduler.py new file mode 100644 index 000000000..121918a31 --- /dev/null +++ b/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_scheduler.py @@ -0,0 +1,498 @@ +""" +主动思考调度器 +基于统一调度器(unified_scheduler),为每个聊天流管理主动思考任务 +根据聊天流的兴趣分数(stream_interest_score)动态计算触发间隔 +""" + +import asyncio +from datetime import datetime, timedelta +from typing import Any, Optional + +from src.common.database.sqlalchemy_database_api import get_db_session +from src.common.database.sqlalchemy_models import ChatStreams +from src.common.logger import get_logger +from src.schedule.unified_scheduler import TriggerType, unified_scheduler +from sqlalchemy import select + +logger = get_logger("proactive_thinking_scheduler") + + +class ProactiveThinkingScheduler: + """主动思考调度器 + + 负责为每个聊天流创建和管理主动思考任务。 + 特点: + 1. 根据聊天流的兴趣分数动态计算触发间隔 + 2. 在bot回复后自动重置定时任务 + 3. 抛出话题后暂停,等待下次reply后才恢复 + """ + + def __init__(self): + """初始化调度器""" + self._stream_schedules: dict[str, str] = {} # stream_id -> schedule_id + self._paused_streams: set[str] = set() # 因抛出话题而暂停的聊天流 + self._lock = asyncio.Lock() + + # 统计数据 + self._statistics: dict[str, dict[str, Any]] = {} # stream_id -> 统计信息 + self._daily_counts: dict[str, dict[str, int]] = {} # stream_id -> {date: count} + + # 从全局配置加载(延迟导入避免循环依赖) + from src.config.config import global_config + self.config = global_config.proactive_thinking + + def _calculate_interval(self, interest_score: float) -> int: + """根据兴趣分数计算触发间隔 + + Args: + interest_score: 聊天流兴趣分数 (0.0-1.0) + + Returns: + int: 触发间隔(秒) + + 公式: + - 兴趣分数越高,间隔越短(更频繁思考) + - interval = base_interval * (factor - interest_score) + - 例如:interest_score=0.5 -> interval=1800*1.5=2700秒(45分钟) + - interest_score=0.8 -> interval=1800*1.2=2160秒(36分钟) + - interest_score=0.2 -> interval=1800*1.8=3240秒(54分钟) + """ + # 如果不使用兴趣分数,直接返回基础间隔 + if not self.config.use_interest_score: + return self.config.base_interval + + # 确保分数在有效范围内 + interest_score = max(0.0, min(1.0, interest_score)) + + # 计算间隔:分数越高,系数越小,间隔越短 + factor = self.config.interest_score_factor - interest_score + interval = int(self.config.base_interval * factor) + + # 限制在最小和最大间隔之间 + interval = max(self.config.min_interval, min(self.config.max_interval, interval)) + + logger.debug(f"兴趣分数 {interest_score:.2f} -> 触发间隔 {interval}秒 ({interval/60:.1f}分钟)") + return interval + + def _check_whitelist_blacklist(self, stream_config: str) -> bool: + """检查聊天流是否通过黑白名单验证 + + Args: + stream_config: 聊天流配置字符串,格式: "platform:id:type" + + Returns: + bool: True表示允许主动思考,False表示拒绝 + """ + # 解析类型 + parts = stream_config.split(":") + if len(parts) != 3: + logger.warning(f"无效的stream_config格式: {stream_config}") + return False + + is_private = parts[2] == "private" + + # 检查基础开关 + if is_private and not self.config.enable_in_private: + return False + if not is_private and not self.config.enable_in_group: + return False + + # 黑名单检查(优先级高) + if self.config.blacklist_mode: + blacklist = self.config.blacklist_private if is_private else self.config.blacklist_group + if stream_config in blacklist: + logger.debug(f"聊天流 {stream_config} 在黑名单中,拒绝主动思考") + return False + + # 白名单检查 + if self.config.whitelist_mode: + whitelist = self.config.whitelist_private if is_private else self.config.whitelist_group + if stream_config not in whitelist: + logger.debug(f"聊天流 {stream_config} 不在白名单中,拒绝主动思考") + return False + + return True + + def _check_interest_score_threshold(self, interest_score: float) -> bool: + """检查兴趣分数是否在阈值范围内 + + Args: + interest_score: 兴趣分数 + + Returns: + bool: True表示在范围内 + """ + if interest_score < self.config.min_interest_score: + logger.debug(f"兴趣分数 {interest_score:.2f} 低于最低阈值 {self.config.min_interest_score}") + return False + + if interest_score > self.config.max_interest_score: + logger.debug(f"兴趣分数 {interest_score:.2f} 高于最高阈值 {self.config.max_interest_score}") + return False + + return True + + def _check_daily_limit(self, stream_id: str) -> bool: + """检查今日主动发言次数是否超限 + + Args: + stream_id: 聊天流ID + + Returns: + bool: True表示未超限 + """ + if self.config.max_daily_proactive == 0: + return True # 不限制 + + today = datetime.now().strftime("%Y-%m-%d") + + if stream_id not in self._daily_counts: + self._daily_counts[stream_id] = {} + + # 清理过期日期的数据 + for date in list(self._daily_counts[stream_id].keys()): + if date != today: + del self._daily_counts[stream_id][date] + + count = self._daily_counts[stream_id].get(today, 0) + + if count >= self.config.max_daily_proactive: + logger.debug(f"聊天流 {stream_id} 今日主动发言次数已达上限 ({count}/{self.config.max_daily_proactive})") + return False + + return True + + def _increment_daily_count(self, stream_id: str): + """增加今日主动发言计数""" + today = datetime.now().strftime("%Y-%m-%d") + + if stream_id not in self._daily_counts: + self._daily_counts[stream_id] = {} + + self._daily_counts[stream_id][today] = self._daily_counts[stream_id].get(today, 0) + 1 + + def _is_in_quiet_hours(self) -> bool: + """检查当前是否在安静时段 + + Returns: + bool: True表示在安静时段 + """ + if not self.config.enable_time_strategy: + return False + + now = datetime.now() + current_time = now.strftime("%H:%M") + + start = self.config.quiet_hours_start + end = self.config.quiet_hours_end + + # 处理跨日的情况(如23:00-07:00) + if start <= end: + return start <= current_time <= end + else: + return current_time >= start or current_time <= end + + async def _get_stream_interest_score(self, stream_id: str) -> float: + """从数据库获取聊天流的兴趣分数 + + Args: + stream_id: 聊天流ID + + Returns: + float: 兴趣分数,默认0.5 + """ + try: + async with get_db_session() as session: + stmt = select(ChatStreams).where(ChatStreams.stream_id == stream_id) + result = await session.execute(stmt) + stream = result.scalar_one_or_none() + + if stream and stream.stream_interest_score is not None: + return float(stream.stream_interest_score) + else: + return 0.5 # 默认中等兴趣 + + except Exception as e: + logger.error(f"获取聊天流 {stream_id} 兴趣分数失败: {e}") + return 0.5 + + async def schedule_proactive_thinking(self, stream_id: str) -> bool: + """为聊天流创建或重置主动思考任务 + + Args: + stream_id: 聊天流ID + + Returns: + bool: 是否成功创建/重置任务 + """ + try: + async with self._lock: + # 如果该流因抛出话题而暂停,先清除暂停标记 + if stream_id in self._paused_streams: + logger.info(f"清除聊天流 {stream_id} 的暂停标记") + self._paused_streams.discard(stream_id) + + # 如果已经有任务,先移除 + if stream_id in self._stream_schedules: + old_schedule_id = self._stream_schedules[stream_id] + await unified_scheduler.remove_schedule(old_schedule_id) + logger.debug(f"移除聊天流 {stream_id} 的旧任务") + + # 获取兴趣分数并计算间隔 + interest_score = await self._get_stream_interest_score(stream_id) + interval_seconds = self._calculate_interval(interest_score) + + # 导入回调函数(延迟导入避免循环依赖) + from src.plugins.built_in.affinity_flow_chatter.proactive_thinking_executor import ( + execute_proactive_thinking, + ) + + # 创建新任务 + schedule_id = await unified_scheduler.create_schedule( + callback=execute_proactive_thinking, + trigger_type=TriggerType.TIME, + trigger_config={ + "delay_seconds": interval_seconds, + }, + is_recurring=True, + task_name=f"ProactiveThinking-{stream_id}", + callback_args=(stream_id,), + ) + + self._stream_schedules[stream_id] = schedule_id + + # 计算下次触发时间 + next_run_time = datetime.now() + timedelta(seconds=interval_seconds) + + logger.info( + f"为聊天流 {stream_id} 创建主动思考任务\n" + f" - 兴趣分数: {interest_score:.2f}\n" + f" - 触发间隔: {interval_seconds}秒 ({interval_seconds/60:.1f}分钟)\n" + f" - 下次触发: {next_run_time.strftime('%Y-%m-%d %H:%M:%S')}" + ) + return True + + except Exception as e: + logger.error(f"为聊天流 {stream_id} 创建主动思考任务失败: {e}", exc_info=True) + return False + + async def pause_proactive_thinking(self, stream_id: str, reason: str = "抛出话题") -> bool: + """暂停聊天流的主动思考任务 + + 当选择"抛出话题"后,应该暂停该聊天流的主动思考, + 直到bot至少执行过一次reply后才恢复。 + + Args: + stream_id: 聊天流ID + reason: 暂停原因 + + Returns: + bool: 是否成功暂停 + """ + try: + async with self._lock: + if stream_id not in self._stream_schedules: + logger.warning(f"尝试暂停不存在的任务: {stream_id}") + return False + + schedule_id = self._stream_schedules[stream_id] + success = await unified_scheduler.pause_schedule(schedule_id) + + if success: + self._paused_streams.add(stream_id) + logger.info(f"暂停聊天流 {stream_id} 的主动思考任务,原因: {reason}") + + return success + + except Exception as e: + logger.error(f"暂停聊天流 {stream_id} 的主动思考任务失败: {e}", exc_info=True) + return False + + async def resume_proactive_thinking(self, stream_id: str) -> bool: + """恢复聊天流的主动思考任务 + + Args: + stream_id: 聊天流ID + + Returns: + bool: 是否成功恢复 + """ + try: + async with self._lock: + if stream_id not in self._stream_schedules: + logger.warning(f"尝试恢复不存在的任务: {stream_id}") + return False + + schedule_id = self._stream_schedules[stream_id] + success = await unified_scheduler.resume_schedule(schedule_id) + + if success: + self._paused_streams.discard(stream_id) + logger.info(f"恢复聊天流 {stream_id} 的主动思考任务") + + return success + + except Exception as e: + logger.error(f"恢复聊天流 {stream_id} 的主动思考任务失败: {e}", exc_info=True) + return False + + async def cancel_proactive_thinking(self, stream_id: str) -> bool: + """取消聊天流的主动思考任务 + + Args: + stream_id: 聊天流ID + + Returns: + bool: 是否成功取消 + """ + try: + async with self._lock: + if stream_id not in self._stream_schedules: + return True # 已经不存在,视为成功 + + schedule_id = self._stream_schedules.pop(stream_id) + self._paused_streams.discard(stream_id) + + success = await unified_scheduler.remove_schedule(schedule_id) + logger.info(f"取消聊天流 {stream_id} 的主动思考任务") + + return success + + except Exception as e: + logger.error(f"取消聊天流 {stream_id} 的主动思考任务失败: {e}", exc_info=True) + return False + + async def is_paused(self, stream_id: str) -> bool: + """检查聊天流的主动思考是否被暂停 + + Args: + stream_id: 聊天流ID + + Returns: + bool: 是否暂停中 + """ + async with self._lock: + return stream_id in self._paused_streams + + async def get_task_info(self, stream_id: str) -> Optional[dict[str, Any]]: + """获取聊天流的主动思考任务信息 + + Args: + stream_id: 聊天流ID + + Returns: + dict: 任务信息,如果不存在返回None + """ + async with self._lock: + if stream_id not in self._stream_schedules: + return None + + schedule_id = self._stream_schedules[stream_id] + task_info = await unified_scheduler.get_task_info(schedule_id) + + if task_info: + task_info["is_paused_for_topic"] = stream_id in self._paused_streams + + return task_info + + async def list_all_tasks(self) -> list[dict[str, Any]]: + """列出所有主动思考任务 + + Returns: + list: 任务信息列表 + """ + async with self._lock: + tasks = [] + for stream_id, schedule_id in self._stream_schedules.items(): + task_info = await unified_scheduler.get_task_info(schedule_id) + if task_info: + task_info["stream_id"] = stream_id + task_info["is_paused_for_topic"] = stream_id in self._paused_streams + tasks.append(task_info) + return tasks + + def get_statistics(self) -> dict[str, Any]: + """获取调度器统计信息 + + Returns: + dict: 统计信息 + """ + return { + "total_scheduled_streams": len(self._stream_schedules), + "paused_for_topic": len(self._paused_streams), + "active_tasks": len(self._stream_schedules) - len(self._paused_streams), + } + + async def log_next_trigger_times(self, max_streams: int = 10): + """在日志中输出聊天流的下次触发时间 + + Args: + max_streams: 最多显示多少个聊天流,0表示全部 + """ + logger.info("=" * 60) + logger.info("主动思考任务状态") + logger.info("=" * 60) + + tasks = await self.list_all_tasks() + + if not tasks: + logger.info("当前没有活跃的主动思考任务") + logger.info("=" * 60) + return + + # 按下次触发时间排序 + tasks_sorted = sorted( + tasks, + key=lambda x: x.get("next_run_time", datetime.max) or datetime.max + ) + + # 限制显示数量 + if max_streams > 0: + tasks_sorted = tasks_sorted[:max_streams] + + logger.info(f"共有 {len(self._stream_schedules)} 个任务,显示前 {len(tasks_sorted)} 个") + logger.info("") + + for i, task in enumerate(tasks_sorted, 1): + stream_id = task.get("stream_id", "Unknown") + next_run = task.get("next_run_time") + is_paused = task.get("is_paused_for_topic", False) + + # 获取聊天流名称(如果可能) + stream_name = stream_id[:16] + "..." if len(stream_id) > 16 else stream_id + + if next_run: + # 计算剩余时间 + now = datetime.now() + remaining = next_run - now + remaining_seconds = int(remaining.total_seconds()) + + if remaining_seconds < 0: + time_str = "已过期(待执行)" + elif remaining_seconds < 60: + time_str = f"{remaining_seconds}秒后" + elif remaining_seconds < 3600: + time_str = f"{remaining_seconds // 60}分钟后" + else: + hours = remaining_seconds // 3600 + minutes = (remaining_seconds % 3600) // 60 + time_str = f"{hours}小时{minutes}分钟后" + + status = "⏸️ 暂停中" if is_paused else "✅ 活跃" + + logger.info( + f"[{i:2d}] {status} | {stream_name}\n" + f" 下次触发: {next_run.strftime('%Y-%m-%d %H:%M:%S')} ({time_str})" + ) + else: + logger.info( + f"[{i:2d}] ⚠️ 未知 | {stream_name}\n" + f" 下次触发: 未设置" + ) + + logger.info("") + logger.info("=" * 60) + + +# 全局调度器实例 +proactive_thinking_scheduler = ProactiveThinkingScheduler() diff --git a/src/plugins/built_in/proactive_thinker/__init__.py b/src/plugins/built_in/proactive_thinker/__init__.py deleted file mode 100644 index 176db6dbe..000000000 --- a/src/plugins/built_in/proactive_thinker/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -from src.plugin_system.base.plugin_metadata import PluginMetadata - -__plugin_meta__ = PluginMetadata( - name="MoFox-Bot主动思考", - description="主动思考插件", - usage="该插件由系统自动触发。", - version="1.0.0", - author="MoFox-Studio", - license="GPL-v3.0-or-later", - repository_url="https://github.com/MoFox-Studio", - keywords=["主动思考", "自己发消息"], - categories=["Chat", "Integration"], - extra={"is_built_in": True, "plugin_type": "functional"}, -) diff --git a/src/plugins/built_in/proactive_thinker/plugin.py b/src/plugins/built_in/proactive_thinker/plugin.py deleted file mode 100644 index c539aacea..000000000 --- a/src/plugins/built_in/proactive_thinker/plugin.py +++ /dev/null @@ -1,39 +0,0 @@ -from src.common.logger import get_logger -from src.plugin_system import ( - BaseEventHandler, - BasePlugin, - ConfigField, - EventHandlerInfo, - register_plugin, -) -from src.plugin_system.base.base_plugin import BasePlugin - -from .proacive_thinker_event import ProactiveThinkerEventHandler - -logger = get_logger(__name__) - - -@register_plugin -class ProactiveThinkerPlugin(BasePlugin): - """一个主动思考的插件""" - - plugin_name: str = "proactive_thinker" - enable_plugin: bool = True - dependencies: list[str] = [] - python_dependencies: list[str] = [] - config_file_name: str = "config.toml" - config_schema: dict = { - "plugin": { - "config_version": ConfigField(type=str, default="1.1.0", description="配置文件版本"), - }, - } - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def get_plugin_components(self) -> list[tuple[EventHandlerInfo, type[BaseEventHandler]]]: - """返回插件的EventHandler组件""" - components: list[tuple[EventHandlerInfo, type[BaseEventHandler]]] = [ - (ProactiveThinkerEventHandler.get_handler_info(), ProactiveThinkerEventHandler) - ] - return components diff --git a/src/plugins/built_in/proactive_thinker/proacive_thinker_event.py b/src/plugins/built_in/proactive_thinker/proacive_thinker_event.py deleted file mode 100644 index 2f062c0b0..000000000 --- a/src/plugins/built_in/proactive_thinker/proacive_thinker_event.py +++ /dev/null @@ -1,246 +0,0 @@ -import asyncio -import random -import time -import traceback -from datetime import datetime - -from maim_message import UserInfo - -from src.chat.message_receive.chat_stream import get_chat_manager -from src.common.logger import get_logger -from src.config.config import global_config -from src.manager.async_task_manager import AsyncTask, async_task_manager -from src.plugin_system import BaseEventHandler, EventType -from src.plugin_system.apis import chat_api, message_api, person_api -from src.plugin_system.base.base_event import HandlerResult - -from .proactive_thinker_executor import ProactiveThinkerExecutor - -logger = get_logger(__name__) - - -class ColdStartTask(AsyncTask): - """ - “冷启动”任务,在机器人启动时执行一次。 - 它的核心职责是“唤醒”那些因重启而“沉睡”的聊天流,确保它们能够接收主动思考。 - 对于在白名单中但从未有过记录的全新用户,它也会发起第一次“破冰”问候。 - """ - - def __init__(self, bot_start_time: float): - super().__init__(task_name="ColdStartTask") - self.chat_manager = get_chat_manager() - self.executor = ProactiveThinkerExecutor() - self.bot_start_time = bot_start_time - - async def run(self): - """任务主逻辑,在启动后执行一次白名单扫描。""" - logger.info("冷启动任务已启动,将在短暂延迟后开始唤醒沉睡的聊天流...") - await asyncio.sleep(30) # 延迟以确保所有服务和聊天流已从数据库加载完毕 - - try: - logger.info("【冷启动】开始扫描白名单,唤醒沉睡的聊天流...") - - # 【修复】增加对私聊总开关的判断 - if not global_config.proactive_thinking.enable_in_private: - logger.info("【冷启动】私聊主动思考功能未启用,任务结束。") - return - - enabled_private_chats = global_config.proactive_thinking.enabled_private_chats - if not enabled_private_chats: - logger.debug("【冷启动】私聊白名单为空,任务结束。") - return - - for chat_id in enabled_private_chats: - try: - platform, user_id_str = chat_id.split(":") - user_id = int(user_id_str) - - should_wake_up = False - stream = chat_api.get_stream_by_user_id(user_id_str, platform) - - if not stream: - should_wake_up = True - logger.info(f"【冷启动】发现全新用户 {chat_id},准备发起第一次问候。") - elif stream.last_active_time < self.bot_start_time: - should_wake_up = True - logger.info( - f"【冷启动】发现沉睡的聊天流 {chat_id} (最后活跃于 {datetime.fromtimestamp(stream.last_active_time)}),准备唤醒。" - ) - - if should_wake_up: - person_id = person_api.get_person_id(platform, user_id) - nickname = await person_api.get_person_value(person_id, "nickname") - user_nickname = nickname or f"用户{user_id}" - user_info = UserInfo(platform=platform, user_id=str(user_id), user_nickname=user_nickname) - - # 使用 get_or_create_stream 来安全地获取或创建流 - stream = await self.chat_manager.get_or_create_stream(platform, user_info) - - formatted_stream_id = f"{stream.user_info.platform}:{stream.user_info.user_id}:private" - await self.executor.execute(stream_id=formatted_stream_id, start_mode="cold_start") - logger.info(f"【冷启动】已为用户 {chat_id} (昵称: {user_nickname}) 发送唤醒/问候消息。") - - except ValueError: - logger.warning(f"【冷启动】白名单条目格式错误或用户ID无效,已跳过: {chat_id}") - except Exception as e: - logger.error(f"【冷启动】处理用户 {chat_id} 时发生未知错误: {e}", exc_info=True) - - except asyncio.CancelledError: - logger.info("冷启动任务被正常取消。") - except Exception as e: - logger.error(f"【冷启动】任务出现严重错误: {e}", exc_info=True) - finally: - logger.info("【冷启动】任务执行完毕。") - - -class ProactiveThinkingTask(AsyncTask): - """ - 主动思考的后台任务(日常唤醒),负责在聊天“冷却”后重新活跃气氛。 - 它只处理已经存在的聊天流。 - """ - - def __init__(self): - super().__init__(task_name="ProactiveThinkingTask") - self.chat_manager = get_chat_manager() - self.executor = ProactiveThinkerExecutor() - - def _get_next_interval(self) -> float: - """ - 动态计算下一次执行的时间间隔,模拟人类行为的随机性。 - 结合了基础间隔、随机偏移和每日不同时段的活跃度调整。 - """ - # 从配置中读取基础间隔和随机范围 - base_interval = global_config.proactive_thinking.interval - sigma = global_config.proactive_thinking.interval_sigma - - # 1. 在 [base - sigma, base + sigma] 范围内随机取一个值 - interval = random.uniform(base_interval - sigma, base_interval + sigma) - - # 2. 根据当前时间,应用活跃度调整因子 - now = datetime.now() - current_time_str = now.strftime("%H:%M") - - adjust_rules = global_config.proactive_thinking.talk_frequency_adjust - if adjust_rules and adjust_rules[0]: - # 按时间对规则排序,确保能找到正确的时间段 - rules = sorted([rule.split(",") for rule in adjust_rules[0][1:]], key=lambda x: x[0]) - - factor = 1.0 - # 找到最后一个小于等于当前时间的规则 - for time_str, factor_str in rules: - if current_time_str >= time_str: - factor = float(factor_str) - else: - break # 后面的时间都比当前晚,无需再找 - # factor > 1 表示更活跃,所以用除法来缩短间隔 - interval /= factor - - # 保证最小间隔,防止过于频繁的骚扰 - return max(60.0, interval) - - async def run(self): - """任务主循环,周期性地检查所有已存在的聊天是否需要“唤醒”。""" - logger.info("日常唤醒任务已启动,将根据动态间隔检查聊天活跃度。") - await asyncio.sleep(15) # 初始等待 - - while True: - # 计算下一次检查前的休眠时间 - next_interval = self._get_next_interval() - try: - logger.debug(f"【日常唤醒】下一次检查将在 {next_interval:.2f} 秒后进行。") - await asyncio.sleep(next_interval) - - logger.info("【日常唤醒】开始检查不活跃的聊天...") - - # 加载白名单配置 - enabled_private = set(global_config.proactive_thinking.enabled_private_chats) - enabled_groups = set(global_config.proactive_thinking.enabled_group_chats) - - # 分别处理私聊和群聊 - # 1. 处理私聊:首先检查私聊总开关 - if global_config.proactive_thinking.enable_in_private: - for chat_id in enabled_private: - try: - platform, user_id_str = chat_id.split(":") - # 【核心逻辑】检查聊天流是否存在。不存在则跳过,交由ColdStartTask处理。 - stream = chat_api.get_stream_by_user_id(user_id_str, platform) - if not stream: - continue - - # 检查冷却时间 - recent_messages = await message_api.get_recent_messages( - chat_id=stream.stream_id, limit=1, limit_mode="latest" - ) - last_message_time = recent_messages[0]["time"] if recent_messages else stream.create_time - time_since_last_active = time.time() - last_message_time - if time_since_last_active > next_interval: - logger.info( - f"【日常唤醒-私聊】聊天流 {stream.stream_id} 已冷却 {time_since_last_active:.2f} 秒,触发主动对话。" - ) - formatted_stream_id = f"{stream.user_info.platform}:{stream.user_info.user_id}:private" - await self.executor.execute(stream_id=formatted_stream_id, start_mode="wake_up") - stream.update_active_time() - await self.chat_manager._save_stream(stream) - - except ValueError: - logger.warning(f"【日常唤醒】私聊白名单条目格式错误,已跳过: {chat_id}") - except Exception as e: - logger.error(f"【日常唤醒】处理私聊用户 {chat_id} 时发生未知错误: {e}", exc_info=True) - - # 2. 处理群聊:首先检查群聊总开关 - if global_config.proactive_thinking.enable_in_group: - all_streams = list(self.chat_manager.streams.values()) - for stream in all_streams: - if not stream.group_info: - continue # 只处理群聊 - - # 【修复】检查群聊是否在白名单内 - if f"qq:{stream.group_info.group_id}" in enabled_groups: - # 检查冷却时间 - recent_messages = await message_api.get_recent_messages(chat_id=stream.stream_id, limit=1) - last_message_time = recent_messages[0]["time"] if recent_messages else stream.create_time - time_since_last_active = time.time() - last_message_time - if time_since_last_active > next_interval: - logger.info( - f"【日常唤醒-群聊】聊天流 {stream.stream_id} 已冷却 {time_since_last_active:.2f} 秒,触发主动对话。" - ) - formatted_stream_id = f"{stream.user_info.platform}:{stream.group_info.group_id}:group" - await self.executor.execute(stream_id=formatted_stream_id, start_mode="wake_up") - stream.update_active_time() - await self.chat_manager._save_stream(stream) - - except asyncio.CancelledError: - logger.info("日常唤醒任务被正常取消。") - break - except Exception as e: - traceback.print_exc() # 打印完整的堆栈跟踪 - logger.error(f"【日常唤醒】任务出现错误,将在60秒后重试: {e}", exc_info=True) - await asyncio.sleep(60) - - -class ProactiveThinkerEventHandler(BaseEventHandler): - """主动思考插件的启动事件处理器,负责根据配置启动一个或两个后台任务。""" - - handler_name: str = "proactive_thinker_on_start" - handler_description: str = "主动思考插件的启动事件处理器" - init_subscribe: list[EventType | str] = [EventType.ON_START] - - async def execute(self, kwargs: dict | None) -> "HandlerResult": - """在机器人启动时执行,根据配置决定是否启动后台任务。""" - logger.info("检测到插件启动事件,正在初始化【主动思考】") - # 检查总开关 - if global_config.proactive_thinking.enable: - bot_start_time = time.time() # 记录“诞生时刻” - - # 启动负责“日常唤醒”的核心任务 - proactive_task = ProactiveThinkingTask() - await async_task_manager.add_task(proactive_task) - - # 检查“冷启动”功能的独立开关 - if global_config.proactive_thinking.enable_cold_start: - cold_start_task = ColdStartTask(bot_start_time) - await async_task_manager.add_task(cold_start_task) - - else: - logger.info("【主动思考】功能未启用,所有任务均跳过启动。") - return HandlerResult(success=True, continue_process=True, message=None) diff --git a/src/plugins/built_in/proactive_thinker/proactive_chatter_refactor_plan.md b/src/plugins/built_in/proactive_thinker/proactive_chatter_refactor_plan.md deleted file mode 100644 index 2ad01b307..000000000 --- a/src/plugins/built_in/proactive_thinker/proactive_chatter_refactor_plan.md +++ /dev/null @@ -1,199 +0,0 @@ -# 主动聊天功能重构与设计方案 - -本文档旨在规划一个全新的、真正的“主动发起对话”功能。方案的核心是创建一个独立的、可配置的插件,并重构现有配置,使其更具模块化和可扩展性。 - -## 1. 配置文件重构 (`bot_config.toml`) - -为了提高清晰度和模块化,我们将创建一个新的配置节 `[proactive_thinking]`。 - -### 1.1. 移除旧配置 - -以下配置项将从 `[chat]` 配置节中 **移除**: - -```toml -# mmc/config/bot_config.toml - -# 从 line 132 开始移除以下所有行 -talk_frequency_adjust = [['', '8:00,1', '12:00,1.2', '18:00,1.5', '01:00,0.6'], ['qq:114514:group', '12:20,1', '16:10,2', '20:10,1', '00:10,0.3'], ['qq:1919810:private', '8:20,1', '12:10,2', '20:10,1.5', '00:10,0.2']] -# ... (所有 talk_frequency_adjust 的注释) ... - -# 主动思考功能配置(仅在focus模式下生效) - -enable_proactive_thinking = false -proactive_thinking_interval = 1500 -# ... (所有 proactive_thinking 的注释和相关配置) ... -delta_sigma = 120 -# ... (所有 delta_sigma 的注释和相关配置) ... -``` - -### 1.2. 新增 `[proactive_thinking]` 配置节 - -在 `bot_config.toml` 文件 **末尾**,添加以下全新配置节: - -```toml -# mmc/config/bot_config.toml - -[proactive_thinking] # 主动思考(主动发起对话)功能配置 -# --- 总开关 --- -enable = false # 是否启用主动发起对话功能 - -# --- 触发时机 --- -# 基础触发间隔(秒),AI会围绕这个时间点主动发起对话 -interval = 1500 # 默认25分钟 -# 间隔随机化标准差(秒),让触发时间更自然。设为0则为固定间隔。 -interval_sigma = 120 -# 每日活跃度调整,格式:[["", "HH:MM,factor", ...], ["stream_id", ...]] -# factor > 1.0 会缩短思考间隔,更活跃;factor < 1.0 会延长间隔。 -talk_frequency_adjust = [['', '8:00,1', '12:00,1.2', '18:00,1.5', '01:00,0.6']] - -# --- 作用范围 --- -enable_in_private = true # 是否允许在私聊中主动发起对话 -enable_in_group = true # 是否允许在群聊中主动发起对话 -# 私聊白名单,为空则对所有私聊生效 -# 格式: ["platform:user_id", ...] e.g., ["qq:123456"] -enabled_private_chats = [] -# 群聊白名单,为空则对所有群聊生效 -# 格式: ["platform:group_id", ...] e.g., ["qq:7891011"] -enabled_group_chats = [] - -# --- 冷启动配置 (针对私聊) --- -# 对于白名单中不活跃的私聊,是否允许进行一次“冷启动”问候 -enable_cold_start = true -# 冷启动后,该私聊的下一次主动思考需要等待的最小时间(秒) -cold_start_cooldown = 86400 # 默认24小时 -``` - -## 2. 新插件架构设计 (`proactive_initiation_chatter`) - -我们将创建一个全新的插件来实现此功能。 - -### 2.1. 文件结构 - -``` -mmc/src/plugins/built_in/proactive_initiation_chatter/ -├── __init__.py -├── _manifest.json -├── plugin.py # 插件主入口,负责启动和管理触发器 -├── trigger_manager.py # 核心触发器,内置于插件中 -├── initiation_chatter.py # Chatter实现,监听触发事件 -└── initiation_planner.py # 规划器,负责决定“说什么” -``` - -### 2.2. 核心组件设计 - -#### `plugin.py` - `ProactiveInitiationPlugin` -- **职责**: 作为插件的入口,它将在插件被加载时,读取 `[proactive_thinking]` 配置,并根据配置启动 `ProactiveTriggerManager`。 -- **启动逻辑 (参考 `maizone_refactored`)**: - -#### `trigger_manager.py` - `ProactiveTriggerManager` -- **职责**: 这是一个后台服务类,负责管理所有聊天流的触发计时器,并实现包括“冷启动”在内的所有复杂触发逻辑。 -- **核心逻辑 (参考 `SchedulerService`)**: - - 维护一个异步主循环,定期检查所有符合条件的聊天流。 - - 根据配置的间隔和活跃度调整,计算下次触发时间。 - - 在触发时,调用 `InitiationPlanner` 来决定具体内容,并通过事件管理器派发 `ProactiveInitiationEvent` 或 `ColdStartInitiationEvent`。 - ---- - -## 3. 核心交互与依赖 - -新的 `proactive_initiation_chatter` 插件将与以下核心系统模块进行交互,以确保其决策的智能性和合规性: - -- **`Config`**: `TriggerManager` 和 `Planner` 将从全局配置中读取 `[proactive_thinking]` 配置节,以获取所有行为参数。 -- **`EventManager`**: `TriggerManager` 将通过事件管理器派发 `ProactiveInitiationEvent` 和 `ColdStartInitiationEvent` 事件。`InitiationChatter` 则会监听这些事件以触发执行。 -- **`AsyncTaskManager`**: `ProactiveInitiationPlugin` 将使用此管理器来安全地在后台运行 `TriggerManager` 的主循环。 -- **`ChatManager` (from `chat_stream.py`)**: 这是实现“冷启动”的核心。`TriggerManager` 将调用 `chat_manager.get_or_create_stream()` 方法来按需“唤醒”或创建不活跃的聊天流实例及其附带的空上下文。 -- **`SleepManager`**: 在每次触发决策前,`TriggerManager` **必须**查询 `SleepManager` 以确认AI当前未处于睡眠状态。 -- **`ScheduleManager` / `MonthlyPlanManager`**: `InitiationPlanner` 的“待办任务驱动”策略会查询这些管理器,以获取可作为聊天话题的日程或计划。 -- **`MemoryManager` / `ContextManager`**: `InitiationPlanner` 的“记忆驱动”策略会查询长期记忆和短期上下文,以寻找关联性话题。 -- **`RelationshipManager`**: `InitiationPlanner` 可以查询关系分数,作为执行某些话题策略的门槛。 - -## 4. 插件清单文件 (`_manifest.json`) - -插件的清单文件将定义其元数据和依赖。 - -```json -{ - "manifest_version": 1, - "name": "ProactiveInitiationChatter", - "version": "1.0.0", - "author": "Kilo Code", - "description": "一个真正的主动发起对话插件,由内置的、可高度配置的触发器驱动。", - "dependencies": [], - "python_dependencies": [] -} -``` - ---- - -## 5. 上下文获取与“唤醒”机制详解 - -本设计区分了“热启动”(针对活跃聊天)和“冷启动”(针对非活跃聊天)两种场景,并利用 `ChatManager` 的不同方法来优雅地处理。 - -### 热启动流程 (Hot Start - 针对活跃聊天) - -这是最常见的场景。当一个聊天流近期有过对话,其实例存在于 `ChatManager` 的内存缓存中。 - -1. **获取现有上下文**: `ProactiveTriggerManager` 决定对一个活跃的 `stream_id` 发起对话时,它会调用 `chat_manager.get_stream(stream_id)`。 -2. **返回缓存实例**: `ChatManager` 会直接从内存中返回缓存的 `ChatStream` 实例。 -3. **传递丰富上下文**: 这个实例中包含了**完整的、包含近期对话历史**的 `stream_context`。 -4. **智能决策**: `TriggerManager` 将这个**充满信息**的上下文派发给 `InitiationPlanner`。`Planner` 因此可以优先使用“记忆驱动”等高级策略,生成与前文高度相关的话题,使对话显得自然、连贯。 - -### 冷启动流程 (Cold Start - “唤醒”非活跃聊天) - -针对在白名单中,但当前未加载到内存的私聊。 - -**核心方法:** `ChatManager.get_or_create_stream(platform, user_info, group_info)` - -**唤醒流程:** - -1. `ProactiveTriggerManager` 在主循环中识别到一个需要“冷启动”的私聊 `stream_id`。 -2. `TriggerManager` 构造出必要的 `UserInfo` 对象。 -3. 它调用 `get_chat_manager()`,然后执行核心的唤醒调用: - ```python - # (伪代码) - chat_stream = await chat_manager.get_or_create_stream(...) - ``` -4. 此调用会从数据库加载或全新创建一个 `ChatStream` 实例,该实例内部会自动创建一个**不包含任何历史消息的空上下文**。 -5. `TriggerManager` 将这个**空的 `StreamContext`** 连同 `ColdStartInitiationEvent` 事件一同派发出去,以触发通用的问候语。 - -此双轨制流程无需修改任何核心系统代码,仅通过合理调用现有接口即可实现,保证了方案的稳定性和兼容性。 - ---- - -这份经过强化的设计文档详细说明了配置文件的修改方案、新插件的内部架构以及与核心系统的交互模式。请您审阅。如果这份蓝图符合您的预期,我们就可以准备将此计划交付实施。 - -另外附加:我计划在 InitiationPlanner 中实现一个策略选择系统。每次被 TriggerManager 触发时,它会评估多种“主动聊天策略”的“适宜度分数”,然后选择分数最高的策略来执行。 - -以下是我初步设计的几种策略: - -ColdStartGreetingStrategy (冷启动问候策略) - -触发条件:仅在 TriggerManager 派发 ColdStartInitiationEvent 事件时触发。 -核心逻辑:生成一句通用的、友好的问候语,比如“你好呀!”或者“最近怎么样?”。这是为了“唤醒”那些很久没聊天的私聊对象。 -适宜度分数:固定高分(例如 1.0),确保在冷启动时优先执行。 -MemoryDrivenStrategy (记忆驱动策略) - -触发条件:常规触发 (ProactiveInitiationEvent),且当前聊天流的上下文不为空。 -核心逻辑: -查询 MemoryManager,获取关于当前聊天对象的长期记忆或近期摘要。 -查询 ContextManager,分析最近的几条对话,寻找可以延续的话题。 -利用 LLM 生成一个与上下文或记忆相关的话题。例如:“我们上次聊到的那个项目,后来进展如何了?” -适宜度分数计算 (借鉴AFC): -context_relevance_score (上下文相关性):上下文越丰富、越接近现在,分数越高。 -relationship_score (关系分):从 RelationshipManager 获取,关系越好,越适合深入聊记忆话题。 -final_score = (context_relevance_score * 权重) + (relationship_score * 权重) -TaskDrivenStrategy (任务/日程驱动策略) - -触发条件:常规触发。 -核心逻辑: -查询 ScheduleManager 或 MonthlyPlanManager,看看今天或最近有没有“待办事项”或“计划”。 -如果有,可以围绕这个任务发起对话。例如:“我看到日程表上说今天要去图书馆,准备好了吗?” -适宜度分数计算: -task_urgency_score (任务紧急度):任务越紧急,分数越高。 -task_relevance_score (任务相关度):如果任务与当前聊天对象有关,分数更高。 -final_score = (task_urgency_score * 权重) + (task_relevance_score * 权重) -GenericTopicStrategy (通用话题策略) - -触发条件:作为所有其他策略都无法执行时的“兜底”策略。 -核心逻辑:从一个预设的话题库(或者让 LLM 随机生成)中挑选一个通用的话题,比如“今天天气不错,适合出门散步呢”或者“最近有什么有趣的新闻吗?”。 -适宜度分数:固定低分(例如 0.1),确保它是最后的选择。 \ No newline at end of file diff --git a/src/plugins/built_in/proactive_thinker/proactive_thinker_executor.py b/src/plugins/built_in/proactive_thinker/proactive_thinker_executor.py deleted file mode 100644 index 21b8ff5bb..000000000 --- a/src/plugins/built_in/proactive_thinker/proactive_thinker_executor.py +++ /dev/null @@ -1,337 +0,0 @@ -import time -from datetime import datetime -from typing import Any - -import orjson - -from src.chat.utils.chat_message_builder import build_readable_actions, get_actions_by_timestamp_with_chat -from src.chat.utils.prompt import Prompt -from src.common.logger import get_logger -from src.config.config import global_config, model_config -from src.mood.mood_manager import mood_manager -from src.person_info.person_info import get_person_info_manager -from src.plugin_system.apis import ( - chat_api, - database_api, - generator_api, - llm_api, - message_api, - person_api, - schedule_api, - send_api, -) - -from .prompts import DECISION_PROMPT, PLAN_PROMPT - -logger = get_logger(__name__) - - -class ProactiveThinkerExecutor: - """ - 主动思考执行器 V2 - - 统一执行入口 - - 引入决策模块,判断是否及如何发起对话 - - 结合人设、日程、关系信息生成更具情境的对话 - """ - - def __init__(self): - """ - 初始化 ProactiveThinkerExecutor 实例。 - 目前无需初始化操作。 - """ - pass - - async def execute(self, stream_id: str, start_mode: str = "wake_up"): - """ - 统一执行入口 - Args: - stream_id: 聊天流ID - start_mode: 启动模式, 'cold_start' 或 'wake_up' - """ - logger.info(f"开始为聊天流 {stream_id} 执行主动思考,模式: {start_mode}") - - # 1. 信息收集 - context = await self._gather_context(stream_id) - if not context: - return - - # 2. 决策阶段 - decision_result = await self._make_decision(context, start_mode) - - if not decision_result or not decision_result.get("should_reply"): - reason = decision_result.get("reason", "未提供") if decision_result else "决策过程返回None" - logger.info(f"决策结果为:不回复。原因: {reason}") - await database_api.store_action_info( - chat_stream=self._get_stream_from_id(stream_id), - action_name="proactive_decision", - action_prompt_display=f"主动思考决定不回复,原因: {reason}", - action_done=True, - action_data=decision_result, - ) - return - - # 3. 规划与执行阶段 - topic = decision_result.get("topic", "打个招呼") - reason = decision_result.get("reason", "无") - await database_api.store_action_info( - chat_stream=self._get_stream_from_id(stream_id), - action_name="proactive_decision", - action_prompt_display=f"主动思考决定回复,原因: {reason},话题:{topic}", - action_done=True, - action_data=decision_result, - ) - logger.info(f"决策结果为:回复。话题: {topic}") - - # 根据聊天类型构建特定上下文 - if context["chat_type"] == "private": - user_info = context["user_info"] - relationship = context["relationship"] - target_user_or_group = f"你的朋友 '{user_info.user_nickname}'" - context_specific_block = f""" -1. **你的日程**: -{context["schedule_context"]} -2. **你和Ta的关系**: - - 详细印象: {relationship["impression"]} - - 好感度: {relationship["attitude"]}/100 -3. **最近的聊天摘要**: -{context["recent_chat_history"]} -4. **你最近的相关动作**: -{context["action_history_context"]} -""" - else: # group - group_info = context["group_info"] - target_user_or_group = f"群聊 '{group_info['group_name']}'" - context_specific_block = f""" -1. **你的日程**: -{context["schedule_context"]} -2. **群聊信息**: - - 群名称: {group_info["group_name"]} -3. **最近的聊天摘要**: -{context["recent_chat_history"]} -4. **你最近的相关动作**: -{context["action_history_context"]} -""" - - plan_prompt = PLAN_PROMPT.format( - bot_nickname=global_config.bot.nickname, - persona_core=context["persona"]["core"], - persona_side=context["persona"]["side"], - identity=context["persona"]["identity"], - current_time=context["current_time"], - target_user_or_group=target_user_or_group, - reason=reason, - topic=topic, - context_specific_block=context_specific_block, - mood_state=context["mood_state"], - ) - - if global_config.debug.show_prompt: - logger.info(f"主动思考回复器原始提示词:{plan_prompt}") - - is_success, response, _, _ = await llm_api.generate_with_model( - prompt=plan_prompt, model_config=model_config.model_task_config.replyer - ) - - if is_success and response: - stream = self._get_stream_from_id(stream_id) - if stream: - # 使用消息分割器处理并发送消息 - reply_set = generator_api.process_human_text(response, enable_splitter=True, enable_chinese_typo=False) - for reply_type, content in reply_set: - if reply_type == "text": - await send_api.text_to_stream(stream_id=stream.stream_id, text=content) - else: - logger.warning(f"无法发送消息,因为找不到 stream_id 为 {stream_id} 的聊天流") - - def _get_stream_from_id(self, stream_id: str): - """ - 根据 stream_id 解析并获取对应的聊天流对象。 - - Args: - stream_id: 聊天流的唯一标识符,格式为 "platform:chat_id:stream_type"。 - - Returns: - 对应的 ChatStream 对象,如果解析失败或找不到则返回 None。 - """ - try: - platform, chat_id, stream_type = stream_id.split(":") - if stream_type == "private": - return chat_api.ChatManager.get_private_stream_by_user_id(platform=platform, user_id=chat_id) - elif stream_type == "group": - return chat_api.ChatManager.get_group_stream_by_group_id(platform=platform, group_id=chat_id) - except Exception as e: - logger.error(f"获取 stream_id ({stream_id}) 失败: {e}") - return None - - async def _gather_context(self, stream_id: str) -> dict[str, Any] | None: - """ - 收集构建决策和规划提示词所需的所有上下文信息。 - - 此函数会根据聊天流是私聊还是群聊,收集不同的信息, - 包括但不限于日程、聊天历史、人设、关系信息等。 - - Args: - stream_id: 聊天流ID。 - - Returns: - 一个包含所有上下文信息的字典,如果找不到聊天流则返回 None。 - """ - stream = self._get_stream_from_id(stream_id) - if not stream: - logger.warning(f"无法找到 stream_id 为 {stream_id} 的聊天流") - return None - - # 1. 收集通用信息 (日程, 聊天历史, 动作历史) - schedules = await schedule_api.ScheduleAPI.get_today_schedule() - schedule_context = ( - "\n".join([f"- {s.get('time_range', '未知时间')}: {s.get('activity', '未知活动')}" for s in schedules]) - if schedules - else "今天没有日程安排。" - ) - - recent_messages = await message_api.get_recent_messages( - stream.stream_id, limit=50, limit_mode="latest", hours=12 - ) - recent_chat_history = ( - await message_api.build_readable_messages_to_str(recent_messages) if recent_messages else "无" - ) - - action_history_list = await get_actions_by_timestamp_with_chat( - chat_id=stream.stream_id, - timestamp_start=time.time() - 3600 * 24, # 过去24小时 - timestamp_end=time.time(), - limit=7, - ) - - action_history_context = build_readable_actions(actions=action_history_list) - - # 2. 构建基础上下文 - mood_state = "暂时没有" - if global_config.mood.enable_mood: - try: - mood_state = mood_manager.get_mood_by_chat_id(stream.stream_id).mood_state - except Exception as e: - logger.error(f"获取情绪失败,原因:{e}") - base_context = { - "schedule_context": schedule_context, - "recent_chat_history": recent_chat_history, - "action_history_context": action_history_context, - "mood_state": mood_state, - "persona": { - "core": global_config.personality.personality_core, - "side": global_config.personality.personality_side, - "identity": global_config.personality.identity, - }, - "current_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - } - - # 3. 根据聊天类型补充特定上下文 - if stream.group_info: # 群聊场景 - base_context.update( - { - "chat_type": "group", - "group_info": {"group_name": stream.group_info.group_name, "group_id": stream.group_info.group_id}, - } - ) - return base_context - elif stream.user_info: # 私聊场景 - user_info = stream.user_info - if not user_info.platform or not user_info.user_id: - logger.warning(f"Stream {stream_id} 的 user_info 不完整") - return None - - person_id = person_api.get_person_id(user_info.platform, int(user_info.user_id)) - person_info_manager = get_person_info_manager() - person_info = await person_info_manager.get_values(person_id, ["user_id", "platform", "person_name"]) - cross_context_block = await Prompt.build_cross_context(stream.stream_id, "s4u", person_info) - - # 获取关系信息 - short_impression = await person_info_manager.get_value(person_id, "short_impression") or "无" - impression = await person_info_manager.get_value(person_id, "impression") or "无" - attitude = await person_info_manager.get_value(person_id, "attitude") or 50 - - base_context.update( - { - "chat_type": "private", - "person_id": person_id, - "user_info": user_info, - "cross_context_block": cross_context_block, - "relationship": { - "short_impression": short_impression, - "impression": impression, - "attitude": attitude, - }, - } - ) - return base_context - else: - logger.warning(f"Stream {stream_id} 既没有 group_info 也没有 user_info") - return None - - async def _make_decision(self, context: dict[str, Any], start_mode: str) -> dict[str, Any] | None: - """ - 调用 LLM 进行决策,判断是否应该主动发起对话,以及聊什么话题。 - """ - if context["chat_type"] not in ["private", "group"]: - return {"should_reply": False, "reason": "未知的聊天类型"} - - # 根据聊天类型构建特定上下文 - if context["chat_type"] == "private": - user_info = context["user_info"] - relationship = context["relationship"] - target_user_or_group = f"用户 '{user_info.user_nickname}'" - context_specific_block = f""" - 1. **启动模式**: {start_mode} ({"初次见面/很久未见" if start_mode == "cold_start" else "日常唤醒"}) - 2. **你的日程**: - {context["schedule_context"]} - 3. **你和Ta的关系**: - - 简短印象: {relationship["short_impression"]} - - 详细印象: {relationship["impression"]} - - 好感度: {relationship["attitude"]}/100 - 4. **和Ta在别处的讨论摘要**: - {context["cross_context_block"]} - 5. **最近的聊天摘要**: - {context["recent_chat_history"]} - """ - else: # group - group_info = context["group_info"] - target_user_or_group = f"群聊 '{group_info['group_name']}'" - context_specific_block = f""" - 1. **启动模式**: {start_mode} ({"首次加入/很久未发言" if start_mode == "cold_start" else "日常唤醒"}) - 2. **你的日程**: - {context["schedule_context"]} - 3. **群聊信息**: - - 群名称: {group_info["group_name"]} - 4. **最近的聊天摘要**: - {context["recent_chat_history"]} - """ - prompt = DECISION_PROMPT.format( - bot_nickname=global_config.bot.nickname, - persona_core=context["persona"]["core"], - persona_side=context["persona"]["side"], - identity=context["persona"]["identity"], - mood_state=context["mood_state"], - action_history_context=context["action_history_context"], - current_time=context["current_time"], - target_user_or_group=target_user_or_group, - context_specific_block=context_specific_block, - ) - - if global_config.debug.show_prompt: - logger.info(f"主动思考决策器原始提示词:{prompt}") - - is_success, response, _, _ = await llm_api.generate_with_model( - prompt=prompt, model_config=model_config.model_task_config.utils - ) - - if not is_success: - return {"should_reply": False, "reason": "决策模型生成失败"} - - try: - if global_config.debug.show_prompt: - logger.info(f"主动思考决策器响应:{response}") - decision = orjson.loads(response) - return decision - except orjson.JSONDecodeError: - logger.error(f"决策LLM返回的JSON格式无效: {response}") - return {"should_reply": False, "reason": "决策模型返回格式错误"} diff --git a/src/plugins/built_in/proactive_thinker/prompts.py b/src/plugins/built_in/proactive_thinker/prompts.py deleted file mode 100644 index af27c9afa..000000000 --- a/src/plugins/built_in/proactive_thinker/prompts.py +++ /dev/null @@ -1,97 +0,0 @@ -from src.chat.utils.prompt import Prompt - -# ============================================================================= -# 决策阶段 (Decision Phase) -# ============================================================================= - -DECISION_PROMPT = Prompt( - name="proactive_thinker_decision", - template=""" -# 角色 -你的名字是{bot_nickname},你的人设如下: -- 核心人设: {persona_core} -- 侧面人设: {persona_side} -- 身份: {identity} - -你的当前情绪状态是: {mood_state} - -# 你最近的相关决策历史 (供参考) -{action_history_context} - -# 任务 -现在是 {current_time},你需要根据当前的情境,决定是否要主动向{target_user_or_group}发起对话。 - -# 情境分析 -{context_specific_block} - -# 决策目标 -你的最终目标是根据你的角色和当前情境,做出一个最符合人类社交直觉的决策,以求: -- **(私聊)深化关系**: 通过展现你的关心、记忆和个性来拉近与对方的距离。 -- **(群聊)活跃气氛**: 提出能引起大家兴趣的话题,促进群聊的互动。 -- **提供价值**: 你的出现应该是有意义的,无论是情感上的温暖,还是信息上的帮助。 -- **保持自然**: 避免任何看起来像机器人或骚扰的行为。 - -# 决策指令 -请综合以上所有信息,以稳定、真实、拟人的方式做出决策。你的决策需要以JSON格式输出,包含以下字段: -- `should_reply`: bool, 是否应该发起对话。 -- `topic`: str, 如果 `should_reply` 为 true,你打算聊什么话题? -- `reason`: str, 做出此决策的简要理由,需体现你对上述目标的考量。 - -# 决策流程与核心原则 -1. **检查对话状态**: - - **最后发言者**: 查看【最近的聊天摘要】。如果最后一条消息是你发的,且对方尚未回复,**通常应选择不回复**。这是最重要的原则,以避免打扰。 - - **例外**: 只有在等待时间足够长(例如超过数小时),或者你有非常重要且有时效性的新话题时,才考虑再次发言。 - - **无人发言**: 如果最近的聊天记录里只有你一个人在说话,**绝对不要回复**,以防刷屏。 - -2. **寻找话题切入点 (如果可以回复)**: - - **强关联优先**: 优先从【情境分析】中寻找最自然、最相关的话题。顺序建议:`最近的聊天摘要` > `你和Ta的关系` > `你的日程`。 - - **展现个性**: 结合你的【人设】和【情绪】,思考你会如何看待这些情境信息,并从中找到话题。 - - **备选方案**: 如果实在没有强关联的话题,可以发起一个简单的日常问候。 - -3. **最终决策**: - - **权衡频率**: 查看【你最近的相关决策历史】。如果你在短时间内已经主动发起过多次对话,也应倾向于**不回复**,保持一定的社交距离。 - - **质量胜于数量**: 宁可错过一次普通的互动机会,也不要进行一次尴尬或生硬的对话。 - ---- -请输出你的决策: -""" -) - -# ============================================================================= -# 回复规划阶段 (Plan Phase) -# ============================================================================= - -PLAN_PROMPT = Prompt( - name="proactive_thinker_plan", - template=""" -# 角色 -你的名字是{bot_nickname},你的人设如下: -- 核心人设: {persona_core} -- 侧面人设: {persona_side} -- 身份: {identity} - -# 任务 -现在是 {current_time},你需要主动向{target_user_or_group}发起对话。 - -# 决策上下文 -- **决策理由**: {reason} - -# 情境分析 -{context_specific_block} - -# 对话指引 -- 你决定和Ta聊聊关于“{topic}”的话题。 -- **对话风格**: - - **自然开场**: 你可以根据话题和情境,选择最自然的开场方式。可以直接切入话题(如果话题关联性很强),也可以先用一句简单的问候作为过渡。**不要总是使用同一种开场白**。 - - **融合情境**: 将【情境分析】中的信息巧妙地融入到对话中,让你的话语听起来更真实、更有依据。 - - **符合人设**: 你的语气、用词、甚至表情符号的使用,都应该完全符合你的【角色】设定和当前【情绪】({mood_state})。 - -# 输出要求 -- **简洁**: 不要输出任何多余内容(如前缀、后缀、冒号、引号、at/@等)。 -- **原创**: 不要重复之前的内容,即使意思相近也不行。 -- **直接**: 只输出最终的回复文本本身。 -- **风格**: 回复需简短、完整且口语化。 - -现在,你说: -""" -) diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index e824467d2..87315c2ad 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "7.5.2" +version = "7.5.3" #----以下是给开发人员阅读的,如果你只是部署了MoFox-Bot,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -568,30 +568,67 @@ relationship_tracking_interval_min = 300 # 关系追踪最小间隔时间(秒 relationship_tracking_cooldown_hours = 1.0 # 同一用户关系追踪冷却时间(小时) [proactive_thinking] # 主动思考(主动发起对话)功能配置 -# --- 总开关 --- -enable = true # 是否启用主动发起对话功能 +# 详细配置说明请参考:docs/proactive_thinking_config_guide.md -# --- 触发时机 --- -# 基础触发间隔(秒),AI会围绕这个时间点主动发起对话 -interval = 1500 # 默认25分钟 -# 间隔随机化标准差(秒),让触发时间更自然。设为0则为固定间隔。 -interval_sigma = 120 -# 每日活跃度调整,格式:[["", "HH:MM,factor", ...], ["stream_id", ...]] -# factor > 1.0 会缩短思考间隔,更活跃;factor < 1.0 会延长间隔。 -talk_frequency_adjust = [["", "8:00,1", "12:00,1.2", "18:00,1.5", "01:00,0.6"]] +# --- 总开关 --- +enable = false # 是否启用主动发起对话功能 + +# --- 间隔配置 --- +base_interval = 1800 # 基础触发间隔(秒),默认30分钟 +min_interval = 600 # 最小触发间隔(秒),默认10分钟 +max_interval = 7200 # 最大触发间隔(秒),默认2小时 + +# 动态调整配置 +use_interest_score = true # 是否根据兴趣分数动态调整间隔 +interest_score_factor = 2.0 # 兴趣分数影响因子(1.0-3.0) +# 公式: interval = base_interval * (interest_score_factor - interest_score) +# 例如: interest_score=0.8, factor=2.0 -> interval = 1800 * 1.2 = 2160秒(36分钟) + +# --- 黑白名单配置 --- +whitelist_mode = false # 是否启用白名单模式(启用后只对白名单中的聊天流生效) +blacklist_mode = false # 是否启用黑名单模式(启用后排除黑名单中的聊天流) + +# 白名单配置(示例格式) +whitelist_private = [] # 私聊白名单,格式: ["qq:12345:private"] +whitelist_group = [] # 群聊白名单,格式: ["qq:123456:group"] + +# 黑名单配置(示例格式) +blacklist_private = [] # 私聊黑名单,格式: ["qq:12345:private"] +blacklist_group = [] # 群聊黑名单,格式: ["qq:999999:group"] # --- 作用范围 --- enable_in_private = true # 是否允许在私聊中主动发起对话 enable_in_group = true # 是否允许在群聊中主动发起对话 -# 私聊白名单,为空则对所有私聊生效 -# 格式: ["platform:user_id", ...] e.g., ["qq:123456"] -enabled_private_chats = [] -# 群聊白名单,为空则对所有群聊生效 -# 格式: ["platform:group_id", ...] e.g., ["qq:7891011"] -enabled_group_chats = [] -# --- 冷启动配置 (针对私聊) --- -# 对于白名单中不活跃的私聊,是否允许进行一次“冷启动”问候 -enable_cold_start = true -# 冷启动后,该私聊的下一次主动思考需要等待的最小时间(秒) -cold_start_cooldown = 86400 # 默认24小时 \ No newline at end of file +# --- 兴趣分数阈值 --- +min_interest_score = 0.0 # 最低兴趣分数阈值,低于此值不会主动思考 +max_interest_score = 1.0 # 最高兴趣分数阈值,高于此值不会主动思考 + +# --- 时间策略配置 --- +enable_time_strategy = false # 是否启用时间策略(根据时段调整频率) +quiet_hours_start = "00:00" # 安静时段开始时间,格式: "HH:MM" +quiet_hours_end = "07:00" # 安静时段结束时间,格式: "HH:MM" +active_hours_multiplier = 0.7 # 活跃时段间隔倍数,<1表示更频繁,>1表示更稀疏 + +# --- 冷却与限制 --- +reply_reset_enabled = true # bot回复后是否重置定时器(避免回复后立即又主动发言) +topic_throw_cooldown = 3600 # 抛出话题后的冷却时间(秒),期间暂停主动思考 +max_daily_proactive = 0 # 每个聊天流每天最多主动发言次数,0表示不限制 + +# --- 决策权重配置 --- +do_nothing_weight = 0.4 # do_nothing动作的基础权重 +simple_bubble_weight = 0.3 # simple_bubble动作的基础权重 +throw_topic_weight = 0.3 # throw_topic动作的基础权重 + +# --- 调试与监控 --- +enable_statistics = true # 是否启用统计功能(记录触发次数、决策分布等) +log_decisions = false # 是否记录每次决策的详细日志(用于调试) + +# --- 兼容旧配置(已废弃,建议删除) --- +interval = 1800 # [已废弃] 请使用 base_interval +interval_sigma = 120 # [已废弃] 随机化功能已移除 +talk_frequency_adjust = [] # [已废弃] 请使用 enable_time_strategy 和相关配置 +enabled_private_chats = [] # [已废弃] 请使用 whitelist_private +enabled_group_chats = [] # [已废弃] 请使用 whitelist_group +enable_cold_start = false # [已废弃] 冷启动功能已移除 +cold_start_cooldown = 86400 # [已废弃] 冷启动功能已移除 \ No newline at end of file