diff --git a/plugins/hello_world_plugin/plugin.py b/plugins/hello_world_plugin/plugin.py index e10d7aa39..44c3ceb39 100644 --- a/plugins/hello_world_plugin/plugin.py +++ b/plugins/hello_world_plugin/plugin.py @@ -1,7 +1,9 @@ -import logging import random from typing import Any +from src.common.logger import get_logger + +# 修正导入路径,让Pylance不再抱怨 from src.plugin_system import ( BaseAction, BaseEventHandler, @@ -17,9 +19,9 @@ from src.plugin_system import ( ToolParamType, register_plugin, ) -from src.plugin_system.base.component_types import InjectionRule,InjectionType from src.plugin_system.base.base_event import HandlerResult +logger = get_logger("hello_world_plugin") class StartupMessageHandler(BaseEventHandler): """启动时打印消息的事件处理器。""" @@ -29,7 +31,7 @@ class StartupMessageHandler(BaseEventHandler): init_subscribe = [EventType.ON_START] async def execute(self, params: dict) -> HandlerResult: - logging.info("🎉 Hello World 插件已启动,准备就绪!") + logger.info("🎉 Hello World 插件已启动,准备就绪!") return HandlerResult(success=True, continue_process=True) @@ -186,7 +188,7 @@ class WeatherPrompt(BasePrompt): prompt_name = "weather_info_prompt" prompt_description = "向Planner注入当前天气信息,以丰富对话上下文。" - injection_rules = [InjectionRule(target_prompt="planner_prompt", injection_type=InjectionType.REPLACE, target_content="## 可用动作列表")] + injection_point = "planner_prompt" async def execute(self) -> str: # 在实际应用中,这里可以调用天气API diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 544dec94f..dc6634f65 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -659,41 +659,6 @@ class ChatBot: group_name = getattr(group_info, "group_name", None) group_platform = getattr(group_info, "platform", None) - # 准备 additional_config,将 format_info 嵌入其中 - additional_config_str = None - try: - import orjson - - additional_config_data = {} - - # 首先获取adapter传递的additional_config - if hasattr(message_info, 'additional_config') and message_info.additional_config: - if isinstance(message_info.additional_config, dict): - additional_config_data = message_info.additional_config.copy() - elif isinstance(message_info.additional_config, str): - try: - additional_config_data = orjson.loads(message_info.additional_config) - except Exception as e: - logger.warning(f"无法解析 additional_config JSON: {e}") - additional_config_data = {} - - # 然后添加format_info到additional_config中 - if hasattr(message_info, 'format_info') and message_info.format_info: - try: - format_info_dict = message_info.format_info.to_dict() - additional_config_data["format_info"] = format_info_dict - logger.debug(f"[bot.py] 嵌入 format_info 到 additional_config: {format_info_dict}") - except Exception as e: - logger.warning(f"将 format_info 转换为字典失败: {e}") - else: - logger.warning(f"[bot.py] [问题] 消息缺少 format_info: message_id={message_id}") - - # 序列化为JSON字符串 - if additional_config_data: - additional_config_str = orjson.dumps(additional_config_data).decode("utf-8") - except Exception as e: - logger.error(f"准备 additional_config 失败: {e}") - # 创建数据库消息对象 db_message = DatabaseMessages( message_id=message_id, @@ -709,7 +674,6 @@ class ChatBot: is_notify=bool(message.is_notify), is_public_notice=bool(message.is_public_notice), notice_type=message.notice_type, - additional_config=additional_config_str, user_id=user_id, user_nickname=user_nickname, user_cardname=user_cardname, diff --git a/src/chat/message_receive/chat_stream.py b/src/chat/message_receive/chat_stream.py index 1cc3e548b..d3d418648 100644 --- a/src/chat/message_receive/chat_stream.py +++ b/src/chat/message_receive/chat_stream.py @@ -213,8 +213,8 @@ class ChatStream: priority_info=json.dumps(getattr(message, "priority_info", None)) if getattr(message, "priority_info", None) else None, - # 额外配置 - 需要将 format_info 嵌入到 additional_config 中 - additional_config=self._prepare_additional_config(message_info), + # 额外配置 + additional_config=getattr(message_info, "additional_config", None), # 用户信息 user_id=str(getattr(user_info, "user_id", "")), user_nickname=getattr(user_info, "user_nickname", ""), @@ -253,59 +253,8 @@ class ChatStream: f"interest_value: {db_message.interest_value}" ) - def _prepare_additional_config(self, message_info) -> str | None: - """ - 准备 additional_config,将 format_info 嵌入其中 - - 这个方法模仿 storage.py 中的逻辑,确保 DatabaseMessages 中的 additional_config - 包含 format_info,使得 action_modifier 能够正确获取适配器支持的消息类型 - - Args: - message_info: BaseMessageInfo 对象 - - Returns: - str | None: JSON 字符串格式的 additional_config,如果为空则返回 None - """ - import orjson - - # 首先获取adapter传递的additional_config - additional_config_data = {} - if hasattr(message_info, 'additional_config') and message_info.additional_config: - if isinstance(message_info.additional_config, dict): - additional_config_data = message_info.additional_config.copy() - elif isinstance(message_info.additional_config, str): - # 如果是字符串,尝试解析 - try: - additional_config_data = orjson.loads(message_info.additional_config) - except Exception as e: - logger.warning(f"无法解析 additional_config JSON: {e}") - additional_config_data = {} - - # 然后添加format_info到additional_config中 - if hasattr(message_info, 'format_info') and message_info.format_info: - try: - format_info_dict = message_info.format_info.to_dict() - additional_config_data["format_info"] = format_info_dict - logger.debug(f"嵌入 format_info 到 additional_config: {format_info_dict}") - except Exception as e: - logger.warning(f"将 format_info 转换为字典失败: {e}") - else: - logger.warning(f"[问题] 消息缺少 format_info: message_id={getattr(message_info, 'message_id', 'unknown')}") - logger.warning("[问题] 这可能导致 Action 无法正确检查适配器支持的类型") - - # 序列化为JSON字符串 - if additional_config_data: - try: - return orjson.dumps(additional_config_data).decode("utf-8") - except Exception as e: - logger.error(f"序列化 additional_config 失败: {e}") - return None - return None - def _safe_get_actions(self, message: "MessageRecv") -> list | None: """安全获取消息的actions字段""" - import json - try: actions = getattr(message, "actions", None) if actions is None: @@ -314,6 +263,8 @@ class ChatStream: # 如果是字符串,尝试解析为JSON if isinstance(actions, str): try: + import json + actions = json.loads(actions) except json.JSONDecodeError: logger.warning(f"无法解析actions JSON字符串: {actions}") diff --git a/src/chat/message_receive/optimized_chat_stream.py b/src/chat/message_receive/optimized_chat_stream.py index 2f7059a3e..bc59631e6 100644 --- a/src/chat/message_receive/optimized_chat_stream.py +++ b/src/chat/message_receive/optimized_chat_stream.py @@ -230,7 +230,7 @@ class OptimizedChatStream: priority_info=json.dumps(getattr(message, "priority_info", None)) if getattr(message, "priority_info", None) else None, - additional_config=self._prepare_additional_config(message_info), + additional_config=getattr(message_info, "additional_config", None), user_id=str(getattr(user_info, "user_id", "")), user_nickname=getattr(user_info, "user_nickname", ""), user_cardname=getattr(user_info, "user_cardname", None), @@ -342,59 +342,8 @@ class OptimizedChatStream: return instance - def _prepare_additional_config(self, message_info) -> str | None: - """ - 准备 additional_config,将 format_info 嵌入其中 - - 这个方法模仿 storage.py 中的逻辑,确保 DatabaseMessages 中的 additional_config - 包含 format_info,使得 action_modifier 能够正确获取适配器支持的消息类型 - - Args: - message_info: BaseMessageInfo 对象 - - Returns: - str | None: JSON 字符串格式的 additional_config,如果为空则返回 None - """ - import orjson - - # 首先获取adapter传递的additional_config - additional_config_data = {} - if hasattr(message_info, 'additional_config') and message_info.additional_config: - if isinstance(message_info.additional_config, dict): - additional_config_data = message_info.additional_config.copy() - elif isinstance(message_info.additional_config, str): - # 如果是字符串,尝试解析 - try: - additional_config_data = orjson.loads(message_info.additional_config) - except Exception as e: - logger.warning(f"无法解析 additional_config JSON: {e}") - additional_config_data = {} - - # 然后添加format_info到additional_config中 - if hasattr(message_info, 'format_info') and message_info.format_info: - try: - format_info_dict = message_info.format_info.to_dict() - additional_config_data["format_info"] = format_info_dict - logger.debug(f"嵌入 format_info 到 additional_config: {format_info_dict}") - except Exception as e: - logger.warning(f"将 format_info 转换为字典失败: {e}") - else: - logger.warning(f"[问题] 消息缺少 format_info: message_id={getattr(message_info, 'message_id', 'unknown')}") - logger.warning("[问题] 这可能导致 Action 无法正确检查适配器支持的类型") - - # 序列化为JSON字符串 - if additional_config_data: - try: - return orjson.dumps(additional_config_data).decode("utf-8") - except Exception as e: - logger.error(f"序列化 additional_config 失败: {e}") - return None - return None - def _safe_get_actions(self, message: "MessageRecv") -> list | None: """安全获取消息的actions字段""" - import json - try: actions = getattr(message, "actions", None) if actions is None: @@ -402,6 +351,8 @@ class OptimizedChatStream: if isinstance(actions, str): try: + import json + actions = json.loads(actions) except json.JSONDecodeError: logger.warning(f"无法解析actions JSON字符串: {actions}") diff --git a/src/chat/message_receive/storage.py b/src/chat/message_receive/storage.py index 174001411..edf9bb9c8 100644 --- a/src/chat/message_receive/storage.py +++ b/src/chat/message_receive/storage.py @@ -99,27 +99,6 @@ class MessageStorage: # 将priority_info字典序列化为JSON字符串,以便存储到数据库的Text字段 priority_info_json = orjson.dumps(priority_info).decode("utf-8") if priority_info else None - # 准备additional_config,包含format_info和其他配置 - additional_config_data = None - - # 首先获取adapter传递的additional_config - if hasattr(message.message_info, 'additional_config') and message.message_info.additional_config: - additional_config_data = message.message_info.additional_config.copy() # 避免修改原始对象 - else: - additional_config_data = {} - - # 然后添加format_info到additional_config中 - if hasattr(message.message_info, 'format_info') and message.message_info.format_info: - format_info_dict = message.message_info.format_info.to_dict() - additional_config_data["format_info"] = format_info_dict - logger.debug(f"保存format_info: {format_info_dict}") - else: - logger.warning(f"[问题] 消息缺少format_info: message_id={getattr(message.message_info, 'message_id', 'unknown')}") - logger.warning("[问题] 这可能导致Action无法正确检查适配器支持的类型") - - # 序列化为JSON字符串以便存储 - additional_config_json = orjson.dumps(additional_config_data).decode("utf-8") if additional_config_data else None - # 获取数据库会话 new_message = Messages( @@ -155,7 +134,6 @@ class MessageStorage: is_command=is_command, key_words=key_words, key_words_lite=key_words_lite, - additional_config=additional_config_json, ) async with get_db_session() as session: session.add(new_message) diff --git a/src/chat/planner_actions/action_manager.py b/src/chat/planner_actions/action_manager.py index e7ff21ad4..90d2b265e 100644 --- a/src/chat/planner_actions/action_manager.py +++ b/src/chat/planner_actions/action_manager.py @@ -165,6 +165,7 @@ class ChatterActionManager: 执行结果 """ + chat_stream = None try: logger.debug(f"🎯 [ActionManager] execute_action接收到 target_message: {target_message}") # 通过chat_id获取chat_stream @@ -180,6 +181,9 @@ class ChatterActionManager: "error": "chat_stream not found", } + # 设置正在回复的状态 + chat_stream.context_manager.context.is_replying = True + if action_name == "no_action": return {"action_type": "no_action", "success": True, "reply_text": "", "command": ""} @@ -205,7 +209,7 @@ class ChatterActionManager: action_build_into_prompt=False, action_prompt_display=reason, action_done=True, - thinking_id=thinking_id, + thinking_id=thinking_id or "", action_data={"reason": reason}, action_name="no_reply", ) @@ -298,6 +302,10 @@ class ChatterActionManager: "loop_info": None, "error": str(e), } + finally: + # 确保重置正在回复的状态 + if chat_stream: + chat_stream.context_manager.context.is_replying = False async def _record_action_to_message(self, chat_stream, action_name, target_message, action_data): """ diff --git a/src/chat/planner_actions/action_modifier.py b/src/chat/planner_actions/action_modifier.py index d48af9761..35a17d675 100644 --- a/src/chat/planner_actions/action_modifier.py +++ b/src/chat/planner_actions/action_modifier.py @@ -4,8 +4,6 @@ import random import time from typing import TYPE_CHECKING, Any -import orjson - from src.chat.message_receive.chat_stream import get_chat_manager from src.chat.planner_actions.action_manager import ChatterActionManager from src.chat.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat @@ -184,133 +182,13 @@ class ActionModifier: def _check_action_associated_types(self, all_actions: dict[str, ActionInfo], chat_context: StreamContext): type_mismatched_actions: list[tuple[str, str]] = [] for action_name, action_info in all_actions.items(): - if action_info.associated_types and not self._check_action_output_types(action_info.associated_types, chat_context): + if action_info.associated_types and not chat_context.check_types(action_info.associated_types): associated_types_str = ", ".join(action_info.associated_types) reason = f"适配器不支持(需要: {associated_types_str})" type_mismatched_actions.append((action_name, reason)) logger.debug(f"{self.log_prefix}决定移除动作: {action_name},原因: {reason}") return type_mismatched_actions - def _check_action_output_types(self, output_types: list[str], chat_context: StreamContext) -> bool: - """ - 检查Action的输出类型是否被当前适配器支持 - - Args: - output_types: Action需要输出的消息类型列表 - chat_context: 聊天上下文 - - Returns: - bool: 如果所有输出类型都支持则返回True - """ - # 获取当前适配器支持的输出类型 - adapter_supported_types = self._get_adapter_supported_output_types(chat_context) - - # 检查所有需要的输出类型是否都被支持 - for output_type in output_types: - if output_type not in adapter_supported_types: - logger.debug(f"适配器不支持输出类型 '{output_type}',支持的类型: {adapter_supported_types}") - return False - return True - - def _get_adapter_supported_output_types(self, chat_context: StreamContext) -> list[str]: - """ - 获取当前适配器支持的输出类型列表 - - Args: - chat_context: 聊天上下文 - - Returns: - list[str]: 支持的输出类型列表 - """ - # 检查additional_config是否存在且不为空 - additional_config = None - has_additional_config = False - - # 先检查 current_message 是否存在 - if not chat_context.current_message: - logger.warning(f"{self.log_prefix} [问题] chat_context.current_message 为 None,无法获取适配器支持的类型") - return ["text", "emoji"] # 返回基础类型 - - if hasattr(chat_context.current_message, "additional_config"): - additional_config = chat_context.current_message.additional_config - - # 更准确的非空判断 - if additional_config is not None: - if isinstance(additional_config, str) and additional_config.strip(): - has_additional_config = True - elif isinstance(additional_config, dict): - # 字典存在就可以,即使为空也可能有format_info字段 - has_additional_config = True - else: - logger.warning(f"{self.log_prefix} [问题] current_message 没有 additional_config 属性") - - logger.debug(f"{self.log_prefix} [调试] has_additional_config: {has_additional_config}") - - if has_additional_config: - try: - logger.debug(f"{self.log_prefix} [调试] 开始解析 additional_config") - format_info = None - - # 处理additional_config可能是字符串或字典的情况 - if isinstance(additional_config, str): - # 如果是字符串,尝试解析为JSON - try: - config = orjson.loads(additional_config) - format_info = config.get("format_info") - except (orjson.JSONDecodeError, AttributeError, TypeError) as e: - format_info = None - - elif isinstance(additional_config, dict): - # 如果是字典,直接获取format_info - format_info = additional_config.get("format_info") - - # 如果找到了format_info,从中提取支持的类型 - if format_info: - if "accept_format" in format_info: - accept_format = format_info["accept_format"] - if isinstance(accept_format, str): - accept_format = [accept_format] - elif isinstance(accept_format, list): - pass - else: - accept_format = list(accept_format) if hasattr(accept_format, "__iter__") else [] - - # 合并基础类型和适配器特定类型 - result = list(set(accept_format)) - return result - - # 备用检查content_format字段 - elif "content_format" in format_info: - content_format = format_info["content_format"] - logger.debug(f"{self.log_prefix} [调试] 找到 content_format: {content_format}") - if isinstance(content_format, str): - content_format = [content_format] - elif isinstance(content_format, list): - pass - else: - content_format = list(content_format) if hasattr(content_format, "__iter__") else [] - - result = list(set(content_format)) - return result - else: - logger.warning(f"{self.log_prefix} [问题] additional_config 中没有 format_info 字段") - except Exception as e: - logger.error(f"{self.log_prefix} [问题] 解析适配器格式信息失败: {e}", exc_info=True) - else: - logger.warning(f"{self.log_prefix} [问题] additional_config 不存在或为空") - - # 如果无法获取格式信息,返回默认支持的基础类型 - default_types = ["text", "emoji"] - logger.warning( - f"{self.log_prefix} [问题] 无法从适配器获取支持的消息类型,使用默认类型: {default_types}" - ) - logger.warning( - f"{self.log_prefix} [问题] 这可能导致某些 Action 被错误地过滤。" - f"请检查适配器是否正确设置了 format_info。" - ) - return default_types - - async def _get_deactivated_actions_by_type( self, actions_with_info: dict[str, ActionInfo], diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 569b225c4..d15dc9589 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -162,62 +162,6 @@ If you need to use the search tool, please directly call the function "lpmm_sear name="lpmm_get_knowledge_prompt", ) - # normal 版 prompt 模板(0.9之前的简化模式) - logger.debug("[Prompt模式调试] 正在注册normal_style_prompt模板") - Prompt( - """ -{chat_scene} - -**重要:消息针对性判断** -在回应之前,首先分析消息的针对性: -1. **直接针对你**:@你、回复你、明确询问你 → 必须回应 -2. **间接相关**:涉及你感兴趣的话题但未直接问你 → 谨慎参与 -3. **他人对话**:与你无关的私人交流 → 通常不参与 -4. **重复内容**:他人已充分回答的问题 → 避免重复 - -{expression_habits_block} -{tool_info_block} -{knowledge_prompt} -{memory_block} -{relation_info_block} -{extra_info_block} - -{notice_block} - -{cross_context_block} -{identity} -如果有人说你是人机,你可以用一种阴阳怪气的口吻来回应 -{schedule_block} - -{action_descriptions} - -下面是群里最近的聊天内容: --------------------------------- -{time_block} -{chat_info} --------------------------------- - -{reply_target_block} - -你现在的心情是:{mood_state} -{config_expression_style} -注意不要复读你前面发过的内容,意思相近也不行。 -{keywords_reaction_prompt} -请注意不要输出多余内容(包括前后缀,冒号和引号,at或 @等 )。只输出回复内容。 -{moderation_prompt} -你的核心任务是针对 {reply_target_block} 中提到的内容,{relation_info_block}生成一段紧密相关且能推动对话的回复。你的回复应该: -1. 明确回应目标消息,而不是宽泛地评论。 -2. 可以分享你的看法、提出相关问题,或者开个合适的玩笑。 -3. 目的是让对话更有趣、更深入。 -最终请输出一条简短、完整且口语化的回复。 - -*你叫{bot_name},也有人叫你{bot_nickname}* - -现在,你说: -""", - "normal_style_prompt", - ) - logger.debug("[Prompt模式调试] normal_style_prompt模板注册完成") class DefaultReplyer: @@ -1329,7 +1273,7 @@ class DefaultReplyer: ), "cross_context": asyncio.create_task( self._time_and_run_task( - Prompt.build_cross_context(chat_id, global_config.personality.prompt_mode, target_user_info), + Prompt.build_cross_context(chat_id, "s4u", target_user_info), "cross_context", ) ), @@ -1503,9 +1447,6 @@ class DefaultReplyer: else: reply_target_block = "" - # 根据配置选择模板 - current_prompt_mode = global_config.personality.prompt_mode - # 动态生成聊天场景提示 if is_group_chat: chat_scene_prompt = "你正在一个QQ群里聊天,你需要理解整个群的聊天动态和话题走向,并做出自然的回应。" @@ -1524,7 +1465,7 @@ class DefaultReplyer: available_actions=available_actions, enable_tool=enable_tool, chat_target_info=self.chat_target_info, - prompt_mode=current_prompt_mode, + prompt_mode="s4u", message_list_before_now_long=message_list_before_now_long, message_list_before_short=message_list_before_short, chat_talking_prompt_short=chat_talking_prompt_short, @@ -1552,13 +1493,7 @@ class DefaultReplyer: ) # 使用新的统一Prompt系统 - 使用正确的模板名称 - template_name = "" - if current_prompt_mode == "s4u": - template_name = "s4u_style_prompt" - elif current_prompt_mode == "normal": - template_name = "normal_style_prompt" - elif current_prompt_mode == "minimal": - template_name = "default_expressor_prompt" + template_name = "s4u_style_prompt" # 获取模板内容 template_prompt = await global_prompt_manager.get_prompt_async(template_name) diff --git a/src/chat/utils/prompt.py b/src/chat/utils/prompt.py index fa9249bbd..b791966d9 100644 --- a/src/chat/utils/prompt.py +++ b/src/chat/utils/prompt.py @@ -130,19 +130,15 @@ class PromptManager: # 确保我们有有效的parameters实例 params_for_injection = parameters or original_prompt.parameters - # 应用所有匹配的注入规则,获取修改后的模板 - modified_template = await prompt_component_manager.apply_injections( - target_prompt_name=original_prompt.name, - original_template=original_prompt.template, - params=params_for_injection, + components_prefix = await prompt_component_manager.execute_components_for( + injection_point=original_prompt.name, params=params_for_injection ) - - # 如果模板被修改了,就创建一个新的临时Prompt实例 - if modified_template != original_prompt.template: - logger.info(f"为'{name}'应用了Prompt注入规则") + if components_prefix: + logger.info(f"为'{name}'注入插件内容: \n{components_prefix}") # 创建一个新的临时Prompt实例,不进行注册 + new_template = f"{components_prefix}\n\n{original_prompt.template}" temp_prompt = Prompt( - template=modified_template, + template=new_template, name=original_prompt.name, parameters=original_prompt.parameters, should_register=False, # 确保不重新注册 @@ -400,8 +396,6 @@ class Prompt: # 构建聊天历史 if self.parameters.prompt_mode == "s4u": await self._build_s4u_chat_context(context_data) - else: - await self._build_normal_chat_context(context_data) # 补充基础信息 context_data.update( @@ -444,13 +438,6 @@ class Prompt: context_data["read_history_prompt"] = read_history_prompt context_data["unread_history_prompt"] = unread_history_prompt - async def _build_normal_chat_context(self, context_data: dict[str, Any]) -> None: - """构建normal模式的聊天上下文""" - if not self.parameters.chat_talking_prompt_short: - return - - context_data["chat_info"] = f"""群里的聊天内容: -{self.parameters.chat_talking_prompt_short}""" async def _build_s4u_chat_history_prompts( self, message_list_before_now: list[dict[str, Any]], target_user_id: str, sender: str, chat_id: str @@ -790,8 +777,6 @@ class Prompt: """使用上下文数据格式化模板""" if self.parameters.prompt_mode == "s4u": params = self._prepare_s4u_params(context_data) - elif self.parameters.prompt_mode == "normal": - params = self._prepare_normal_params(context_data) else: params = self._prepare_default_params(context_data) @@ -827,34 +812,6 @@ class Prompt: or "你正在一个QQ群里聊天,你需要理解整个群的聊天动态和话题走向,并做出自然的回应。", } - def _prepare_normal_params(self, context_data: dict[str, Any]) -> dict[str, Any]: - """准备Normal模式的参数""" - return { - **context_data, - "expression_habits_block": context_data.get("expression_habits_block", ""), - "tool_info_block": context_data.get("tool_info_block", ""), - "knowledge_prompt": context_data.get("knowledge_prompt", ""), - "memory_block": context_data.get("memory_block", ""), - "relation_info_block": context_data.get("relation_info_block", ""), - "extra_info_block": self.parameters.extra_info_block or context_data.get("extra_info_block", ""), - "cross_context_block": context_data.get("cross_context_block", ""), - "notice_block": self.parameters.notice_block or context_data.get("notice_block", ""), - "identity": self.parameters.identity_block or context_data.get("identity", ""), - "action_descriptions": self.parameters.action_descriptions or context_data.get("action_descriptions", ""), - "schedule_block": self.parameters.schedule_block or context_data.get("schedule_block", ""), - "time_block": context_data.get("time_block", ""), - "chat_info": context_data.get("chat_info", ""), - "reply_target_block": context_data.get("reply_target_block", ""), - "config_expression_style": global_config.personality.reply_style, - "mood_state": self.parameters.mood_prompt or context_data.get("mood_state", ""), - "keywords_reaction_prompt": self.parameters.keywords_reaction_prompt - or context_data.get("keywords_reaction_prompt", ""), - "moderation_prompt": self.parameters.moderation_prompt_block or context_data.get("moderation_prompt", ""), - "safety_guidelines_block": self.parameters.safety_guidelines_block - or context_data.get("safety_guidelines_block", ""), - "chat_scene": self.parameters.chat_scene - or "你正在一个QQ群里聊天,你需要理解整个群的聊天动态和话题走向,并做出自然的回应。", - } def _prepare_default_params(self, context_data: dict[str, Any]) -> dict[str, Any]: """准备默认模式的参数""" @@ -1025,12 +982,7 @@ class Prompt: if not chat_stream: return "" - if prompt_mode == "normal": - context_group = await cross_context_api.get_context_group(chat_id) - if not context_group: - return "" - return await cross_context_api.build_cross_context_normal(chat_stream, context_group) - elif prompt_mode == "s4u": + if prompt_mode == "s4u": return await cross_context_api.build_cross_context_s4u(chat_stream, target_user_info) return "" @@ -1083,12 +1035,12 @@ async def create_prompt_async( # 动态注入插件内容 if name: - modified_template = await prompt_component_manager.apply_injections( - target_prompt_name=name, original_template=template, params=final_params + components_prefix = await prompt_component_manager.execute_components_for( + injection_point=name, params=final_params ) - if modified_template != template: - logger.debug(f"为'{name}'应用了Prompt注入规则") - template = modified_template + if components_prefix: + logger.debug(f"为'{name}'注入插件内容: \n{components_prefix}") + template = f"{components_prefix}\n\n{template}" # 使用可能已修改的模板创建实例 prompt = create_prompt(template, name, final_params) diff --git a/src/chat/utils/prompt_component_manager.py b/src/chat/utils/prompt_component_manager.py index c1fb92e13..765a81416 100644 --- a/src/chat/utils/prompt_component_manager.py +++ b/src/chat/utils/prompt_component_manager.py @@ -1,11 +1,9 @@ import asyncio -import re -from typing import Type from src.chat.utils.prompt_params import PromptParameters from src.common.logger import get_logger from src.plugin_system.base.base_prompt import BasePrompt -from src.plugin_system.base.component_types import ComponentType, InjectionRule, InjectionType, PromptInfo +from src.plugin_system.base.component_types import ComponentType, PromptInfo from src.plugin_system.core.component_registry import component_registry logger = get_logger("prompt_component_manager") @@ -21,143 +19,89 @@ class PromptComponentManager: 3. 提供一个接口,以便在构建核心Prompt时,能够获取并执行所有相关的组件。 """ - def _get_rules_for(self, target_prompt_name: str) -> list[tuple[InjectionRule, Type[BasePrompt]]]: + def get_components_for(self, injection_point: str) -> list[type[BasePrompt]]: """ - 获取指定目标Prompt的所有注入规则及其关联的组件类。 + 获取指定注入点的所有已注册组件类。 Args: - target_prompt_name (str): 目标 Prompt 的名称。 + injection_point: 目标Prompt的名称。 Returns: - list[tuple[InjectionRule, Type[BasePrompt]]]: 一个元组列表, - 每个元组包含一个注入规则和其对应的 Prompt 组件类,并已根据优先级排序。 + list[Type[BasePrompt]]: 与该注入点关联的组件类列表。 """ - # 从注册表中获取所有已启用的 PROMPT 类型的组件 + # 从组件注册中心获取所有启用的Prompt组件 enabled_prompts = component_registry.get_enabled_components_by_type(ComponentType.PROMPT) - matching_rules = [] - # 遍历所有启用的 Prompt 组件,查找与目标 Prompt 相关的注入规则 + matching_components: list[type[BasePrompt]] = [] + for prompt_name, prompt_info in enabled_prompts.items(): + # 确保 prompt_info 是 PromptInfo 类型 if not isinstance(prompt_info, PromptInfo): continue - # prompt_info.injection_rules 已经经过了后向兼容处理,确保总是列表 - for rule in prompt_info.injection_rules: - # 如果规则的目标是当前指定的 Prompt - if rule.target_prompt == target_prompt_name: - # 获取该规则对应的组件类 - component_class = component_registry.get_component_class(prompt_name, ComponentType.PROMPT) - # 确保获取到的确实是一个 BasePrompt 的子类 - if component_class and issubclass(component_class, BasePrompt): - matching_rules.append((rule, component_class)) + # 获取注入点信息 + injection_points = prompt_info.injection_point + if isinstance(injection_points, str): + injection_points = [injection_points] - # 根据规则的优先级进行排序,数字越小,优先级越高,越先应用 - matching_rules.sort(key=lambda x: x[0].priority) - return matching_rules + # 检查当前注入点是否匹配 + if injection_point in injection_points: + # 获取组件类 + component_class = component_registry.get_component_class(prompt_name, ComponentType.PROMPT) + if component_class and issubclass(component_class, BasePrompt): + matching_components.append(component_class) - async def apply_injections( - self, target_prompt_name: str, original_template: str, params: PromptParameters - ) -> str: + return matching_components + + async def execute_components_for(self, injection_point: str, params: PromptParameters) -> str: """ - 获取、实例化并执行所有相关组件,然后根据注入规则修改原始模板。 - - 这是一个三步走的过程: - 1. 实例化所有需要执行的组件。 - 2. 并行执行它们的 `execute` 方法以获取注入内容。 - 3. 按照优先级顺序,将内容注入到原始模板中。 + 实例化并执行指定注入点的所有组件,然后将它们的输出拼接成一个字符串。 Args: - target_prompt_name (str): 目标 Prompt 的名称。 - original_template (str): 原始的、未经修改的 Prompt 模板字符串。 - params (PromptParameters): 传递给 Prompt 组件实例的参数。 + injection_point: 目标Prompt的名称。 + params: 用于初始化组件的 PromptParameters 对象。 Returns: - str: 应用了所有注入规则后,修改过的 Prompt 模板字符串。 + str: 所有相关组件生成的、用换行符连接的文本内容。 """ - rules_with_classes = self._get_rules_for(target_prompt_name) - # 如果没有找到任何匹配的规则,就直接返回原始模板,啥也不干 - if not rules_with_classes: - return original_template - - # --- 第一步: 实例化所有需要执行的组件 --- - instance_map = {} # 存储组件实例,虽然目前没直接用,但留着总没错 - tasks = [] # 存放所有需要并行执行的 execute 异步任务 - components_to_execute = [] # 存放需要执行的组件类,用于后续结果映射 - - for rule, component_class in rules_with_classes: - # 如果注入类型是 REMOVE,那就不需要执行组件了,因为它不产生内容 - if rule.injection_type != InjectionType.REMOVE: - try: - # 获取组件的元信息,主要是为了拿到插件名称来读取插件配置 - prompt_info = component_registry.get_component_info( - component_class.prompt_name, ComponentType.PROMPT - ) - if not isinstance(prompt_info, PromptInfo): - plugin_config = {} - else: - # 从注册表获取该组件所属插件的配置 - plugin_config = component_registry.get_plugin_config(prompt_info.plugin_name) - - # 实例化组件,并传入参数和插件配置 - instance = component_class(params=params, plugin_config=plugin_config) - instance_map[component_class.prompt_name] = instance - # 将组件的 execute 方法作为一个任务添加到列表中 - tasks.append(instance.execute()) - components_to_execute.append(component_class) - except Exception as e: - logger.error(f"实例化 Prompt 组件 '{component_class.prompt_name}' 失败: {e}") - # 即使失败,也添加一个立即完成的空任务,以保持与其他任务的索引同步 - tasks.append(asyncio.create_task(asyncio.sleep(0, result=e))) # type: ignore - - # --- 第二步: 并行执行所有组件的 execute 方法 --- - # 使用 asyncio.gather 来同时运行所有任务,提高效率 - results = await asyncio.gather(*tasks, return_exceptions=True) - # 创建一个从组件名到执行结果的映射,方便后续查找 - result_map = { - components_to_execute[i].prompt_name: res - for i, res in enumerate(results) - if not isinstance(res, Exception) # 只包含成功的结果 - } - # 单独处理并记录执行失败的组件 - for i, res in enumerate(results): - if isinstance(res, Exception): - logger.error(f"执行 Prompt 组件 '{components_to_execute[i].prompt_name}' 失败: {res}") - - # --- 第三步: 按优先级顺序应用注入规则 --- - modified_template = original_template - for rule, component_class in rules_with_classes: - # 从结果映射中获取该组件生成的内容 - content = result_map.get(component_class.prompt_name) + component_classes = self.get_components_for(injection_point) + if not component_classes: + return "" + tasks = [] + for component_class in component_classes: try: - if rule.injection_type == InjectionType.PREPEND: - if content: - modified_template = f"{content}\n{modified_template}" - elif rule.injection_type == InjectionType.APPEND: - if content: - modified_template = f"{modified_template}\n{content}" - elif rule.injection_type == InjectionType.REPLACE: - # 使用正则表达式替换目标内容 - if content and rule.target_content: - modified_template = re.sub(rule.target_content, str(content), modified_template) - elif rule.injection_type == InjectionType.INSERT_AFTER: - # 在匹配到的内容后面插入 - if content and rule.target_content: - # re.sub a little trick: \g<0> represents the entire matched string - replacement = f"\\g<0>\n{content}" - modified_template = re.sub(rule.target_content, replacement, modified_template) - elif rule.injection_type == InjectionType.REMOVE: - # 使用正则表达式移除目标内容 - if rule.target_content: - modified_template = re.sub(rule.target_content, "", modified_template) - except re.error as e: - logger.error( - f"在为 '{component_class.prompt_name}' 应用规则时发生正则错误: {e} (pattern: '{rule.target_content}')" + # 从注册中心获取组件信息 + prompt_info = component_registry.get_component_info( + component_class.prompt_name, ComponentType.PROMPT ) - except Exception as e: - logger.error(f"应用 Prompt 注入规则 '{rule}' 失败: {e}") + if not isinstance(prompt_info, PromptInfo): + logger.warning(f"找不到 Prompt 组件 '{component_class.prompt_name}' 的信息,无法获取插件配置") + plugin_config = {} + else: + plugin_config = component_registry.get_plugin_config(prompt_info.plugin_name) - return modified_template + instance = component_class(params=params, plugin_config=plugin_config) + tasks.append(instance.execute()) + except Exception as e: + logger.error(f"实例化 Prompt 组件 '{component_class.prompt_name}' 失败: {e}") + + if not tasks: + return "" + + # 并行执行所有组件 + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 过滤掉执行失败的结果和空字符串 + valid_results = [] + for i, result in enumerate(results): + if isinstance(result, Exception): + logger.error(f"执行 Prompt 组件 '{component_classes[i].prompt_name}' 失败: {result}") + elif result and isinstance(result, str) and result.strip(): + valid_results.append(result.strip()) + + # 使用换行符拼接所有有效结果 + return "\n".join(valid_results) # 创建全局单例 diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 299c2b291..09e261132 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -64,7 +64,6 @@ class PersonalityConfig(ValidatedConfigBase): default_factory=list, description="安全与互动底线,Bot在任何情况下都必须遵守的原则" ) reply_style: str = Field(default="", description="表达风格") - prompt_mode: Literal["s4u", "normal"] = Field(default="s4u", description="Prompt模式") compress_personality: bool = Field(default=True, description="是否压缩人格") compress_identity: bool = Field(default=True, description="是否压缩身份") @@ -632,7 +631,6 @@ class CrossContextConfig(ValidatedConfigBase): # --- Normal模式: 共享组配置 --- groups: list[ContextGroup] = Field(default_factory=list, description="上下文共享组列表") - # --- S4U模式: 用户中心上下文检索 --- s4u_mode: Literal["whitelist", "blacklist"] = Field( default="whitelist", diff --git a/src/main.py b/src/main.py index e36bd4960..46c6ca66d 100644 --- a/src/main.py +++ b/src/main.py @@ -511,8 +511,10 @@ MoFox_Bot(第三方修改版) logger.error(f"月度计划管理器初始化失败: {e}") # 初始化日程管理器 + if global_config.planning_system.schedule_enable: try: - await schedule_manager.initialize() + await schedule_manager.load_or_generate_today_schedule() + await schedule_manager.start_daily_schedule_generation() logger.info("日程表管理器初始化成功") except Exception as e: logger.error(f"日程表管理器初始化失败: {e}") diff --git a/src/plugin_system/apis/schedule_api.py b/src/plugin_system/apis/schedule_api.py index 61c5d13f4..2b456456c 100644 --- a/src/plugin_system/apis/schedule_api.py +++ b/src/plugin_system/apis/schedule_api.py @@ -1,180 +1,330 @@ """ -日程表与月度计划API模块 +日程表与月度计划查询API模块 -专门负责日程和月度计划信息的查询与管理,采用标准Python包设计模式 -所有对外接口均为异步函数,以便于插件开发者在异步环境中使用。 +本模块提供了一系列用于查询日程和月度计划的只读接口。 +所有对外接口均为异步函数,专为插件开发者设计,以便在异步环境中无缝集成。 + +核心功能: +- 查询指定日期的日程安排。 +- 获取当前正在进行的活动。 +- 筛选特定时间范围内的活动。 +- 查询月度计划,支持随机抽样和计数。 +- 所有查询接口均提供格式化输出选项。 使用方式: import asyncio from src.plugin_system.apis import schedule_api async def main(): - # 获取今日日程 - today_schedule = await schedule_api.get_today_schedule() + # 获取今天的日程(原始数据) + today_schedule = await schedule_api.get_schedule() if today_schedule: print("今天的日程:", today_schedule) + # 获取昨天的日程,并格式化为字符串 + from datetime import datetime, timedelta + yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") + formatted_schedule = await schedule_api.get_schedule(date=yesterday, formatted=True) + if formatted_schedule: + print(f"\\n昨天的日程 (格式化):\\n{formatted_schedule}") + # 获取当前活动 current_activity = await schedule_api.get_current_activity() if current_activity: - print("当前活动:", current_activity) + print(f"\\n当前活动: {current_activity.get('activity')}") - # 获取本月月度计划 - from datetime import datetime - this_month = datetime.now().strftime("%Y-%m") - plans = await schedule_api.get_monthly_plans(this_month) - if plans: - print(f"{this_month} 的月度计划:", [p.plan_text for p in plans]) + # 获取本月月度计划总数 + plan_count = await schedule_api.count_monthly_plans() + print(f"\\n本月月度计划总数: {plan_count}") + + # 随机获取本月的2个计划 + random_plans = await schedule_api.get_monthly_plans(random_count=2) + if random_plans: + print("\\n随机的2个计划:", [p.plan_text for p in random_plans]) asyncio.run(main()) """ +import random from datetime import datetime from typing import Any -from src.common.database.sqlalchemy_models import MonthlyPlan +import orjson +from sqlalchemy import func, select + +from src.common.database.sqlalchemy_models import MonthlyPlan, Schedule, get_db_session from src.common.logger import get_logger from src.schedule.database import get_active_plans_for_month -from src.schedule.schedule_manager import schedule_manager logger = get_logger("schedule_api") +# --- 内部辅助函数 --- + +def _format_schedule_list( + items: list[dict[str, Any]] | list[MonthlyPlan], + template: str, + item_type: str, +) -> str: + """将日程或计划列表格式化为字符串""" + if not items: + return "无" + + lines = [] + for item in items: + if item_type == "schedule" and isinstance(item, dict): + lines.append(template.format(time_range=item.get("time_range", ""), activity=item.get("activity", ""))) + elif item_type == "plan" and isinstance(item, MonthlyPlan): + lines.append(template.format(plan_text=item.plan_text)) + return "\\n".join(lines) + + +async def _get_schedule_from_db(date_str: str) -> list[dict[str, Any]] | None: + """从数据库中获取并解析指定日期的日程""" + async with get_db_session() as session: + result = await session.execute(select(Schedule).filter(Schedule.date == date_str)) + schedule_record = result.scalars().first() + if schedule_record and schedule_record.schedule_data: + try: + return orjson.loads(str(schedule_record.schedule_data)) + except orjson.JSONDecodeError: + logger.warning(f"无法解析数据库中的日程数据 (日期: {date_str})") + return None + + +# --- API实现 --- + + class ScheduleAPI: - """日程表与月度计划API - 负责日程和计划信息的查询与管理""" + """日程表与月度计划查询API""" @staticmethod - async def get_today_schedule() -> list[dict[str, Any]] | None: - """(异步) 获取今天的日程安排 + async def get_schedule( + date: str | None = None, + formatted: bool = False, + format_template: str = "{time_range}: {activity}", + ) -> list[dict[str, Any]] | str | None: + """ + (异步) 获取指定日期的日程安排。 + + Args: + date (Optional[str]): 目标日期,格式 "YYYY-MM-DD"。如果为None,则使用当前日期。 + formatted (bool): 如果为True,返回格式化的字符串;否则返回原始数据列表。 + format_template (str): 当 formatted=True 时使用的格式化模板。 Returns: - Optional[List[Dict[str, Any]]]: 今天的日程列表,如果未生成或未启用则返回None + Union[List[Dict[str, Any]], str, None]: 日程数据或None。 """ + target_date = date or datetime.now().strftime("%Y-%m-%d") try: - logger.debug("[ScheduleAPI] 正在获取今天的日程安排...") - return schedule_manager.today_schedule + logger.debug(f"[ScheduleAPI] 正在获取 {target_date} 的日程安排...") + schedule_data = await _get_schedule_from_db(target_date) + if schedule_data is None: + return None + if formatted: + return _format_schedule_list(schedule_data, format_template, "schedule") + return schedule_data except Exception as e: - logger.error(f"[ScheduleAPI] 获取今日日程失败: {e}") + logger.error(f"[ScheduleAPI] 获取 {target_date} 日程失败: {e}") return None @staticmethod - async def get_current_activity() -> str | None: - """(异步) 获取当前正在进行的活动 + async def get_current_activity( + formatted: bool = False, + format_template: str = "{time_range}: {activity}", + ) -> dict[str, Any] | str | None: + """ + (异步) 获取当前正在进行的活动。 + + Args: + formatted (bool): 如果为True,返回格式化的字符串;否则返回活动字典。 + format_template (str): 当 formatted=True 时使用的格式化模板。 Returns: - Optional[str]: 当前活动名称,如果没有则返回None + Union[Dict[str, Any], str, None]: 当前活动数据或None。 """ try: logger.debug("[ScheduleAPI] 正在获取当前活动...") - return schedule_manager.get_current_activity() + today_schedule = await _get_schedule_from_db(datetime.now().strftime("%Y-%m-%d")) + if not today_schedule: + return None + + now = datetime.now().time() + for event in today_schedule: + time_range = event.get("time_range") + if not time_range: + continue + try: + start_str, end_str = time_range.split("-") + start_time = datetime.strptime(start_str.strip(), "%H:%M").time() + end_time = datetime.strptime(end_str.strip(), "%H:%M").time() + if (start_time <= now < end_time) or \ + (end_time < start_time and (now >= start_time or now < end_time)): + if formatted: + return _format_schedule_list([event], format_template, "schedule") + return event + except (ValueError, KeyError): + continue + return None except Exception as e: logger.error(f"[ScheduleAPI] 获取当前活动失败: {e}") return None @staticmethod - async def regenerate_schedule() -> bool: - """(异步) 触发后台重新生成今天的日程 - - Returns: - bool: 是否成功触发 + async def get_activities_between( + start_time: str, + end_time: str, + date: str | None = None, + formatted: bool = False, + format_template: str = "{time_range}: {activity}", + ) -> list[dict[str, Any]] | str | None: """ - try: - logger.info("[ScheduleAPI] 正在触发后台重新生成日程...") - await schedule_manager.generate_and_save_schedule() - return True - except Exception as e: - logger.error(f"[ScheduleAPI] 触发日程重新生成失败: {e}") - return False - - @staticmethod - async def get_monthly_plans(target_month: str | None = None) -> list[MonthlyPlan]: - """(异步) 获取指定月份的有效月度计划 + (异步) 获取指定日期和时间范围内的所有活动。 Args: - target_month (Optional[str]): 目标月份,格式为 "YYYY-MM"。如果为None,则使用当前月份。 + start_time (str): 开始时间,格式 "HH:MM"。 + end_time (str): 结束时间,格式 "HH:MM"。 + date (Optional[str]): 目标日期,格式 "YYYY-MM-DD"。如果为None,则使用当前日期。 + formatted (bool): 如果为True,返回格式化的字符串;否则返回活动列表。 + format_template (str): 当 formatted=True 时使用的格式化模板。 Returns: - List[MonthlyPlan]: 月度计划对象列表 + Union[List[Dict[str, Any]], str, None]: 在时间范围内的活动列表或None。 """ - if target_month is None: - target_month = datetime.now().strftime("%Y-%m") + target_date = date or datetime.now().strftime("%Y-%m-%d") try: - logger.debug(f"[ScheduleAPI] 正在获取 {target_month} 的月度计划...") - return await get_active_plans_for_month(target_month) + logger.debug(f"[ScheduleAPI] 正在获取 {target_date} 从 {start_time} 到 {end_time} 的活动...") + schedule_data = await _get_schedule_from_db(target_date) + if not schedule_data: + return None + + start = datetime.strptime(start_time, "%H:%M").time() + end = datetime.strptime(end_time, "%H:%M").time() + activities_in_range = [] + + for event in schedule_data: + time_range = event.get("time_range") + if not time_range: + continue + try: + event_start_str, event_end_str = time_range.split("-") + event_start = datetime.strptime(event_start_str.strip(), "%H:%M").time() + if start <= event_start < end: + activities_in_range.append(event) + except (ValueError, KeyError): + continue + + if formatted: + return _format_schedule_list(activities_in_range, format_template, "schedule") + return activities_in_range except Exception as e: - logger.error(f"[ScheduleAPI] 获取 {target_month} 月度计划失败: {e}") - return [] + logger.error(f"[ScheduleAPI] 获取时间段内活动失败: {e}") + return None @staticmethod - async def ensure_monthly_plans(target_month: str | None = None) -> bool: - """(异步) 确保指定月份存在月度计划,如果不存在则触发生成 + async def get_monthly_plans( + target_month: str | None = None, + random_count: int | None = None, + formatted: bool = False, + format_template: str = "- {plan_text}", + ) -> list[MonthlyPlan] | str | None: + """ + (异步) 获取指定月份的有效月度计划。 Args: - target_month (Optional[str]): 目标月份,格式为 "YYYY-MM"。如果为None,则使用当前月份。 + target_month (Optional[str]): 目标月份,格式 "YYYY-MM"。如果为None,则使用当前月份。 + random_count (Optional[int]): 如果设置,将随机返回指定数量的计划。 + formatted (bool): 如果为True,返回格式化的字符串;否则返回对象列表。 + format_template (str): 当 formatted=True 时使用的格式化模板。 Returns: - bool: 操作是否成功 (如果已存在或成功生成) + Union[List[MonthlyPlan], str, None]: 月度计划列表、格式化字符串或None。 """ - if target_month is None: - target_month = datetime.now().strftime("%Y-%m") + month = target_month or datetime.now().strftime("%Y-%m") try: - logger.info(f"[ScheduleAPI] 正在确保 {target_month} 的月度计划存在...") - return await schedule_manager.plan_manager.ensure_and_generate_plans_if_needed(target_month) + logger.debug(f"[ScheduleAPI] 正在获取 {month} 的月度计划...") + plans = await get_active_plans_for_month(month) + if not plans: + return None + + if random_count is not None and random_count > 0 and len(plans) > random_count: + plans = random.sample(plans, random_count) + + if formatted: + return _format_schedule_list(plans, format_template, "plan") + return plans except Exception as e: - logger.error(f"[ScheduleAPI] 确保 {target_month} 月度计划失败: {e}") - return False + logger.error(f"[ScheduleAPI] 获取 {month} 月度计划失败: {e}") + return None @staticmethod - async def archive_monthly_plans(target_month: str | None = None) -> bool: - """(异步) 归档指定月份的月度计划 + async def count_monthly_plans(target_month: str | None = None) -> int: + """ + (异步) 获取指定月份的有效月度计划总数。 Args: - target_month (Optional[str]): 目标月份,格式为 "YYYY-MM"。如果为None,则使用当前月份。 + target_month (Optional[str]): 目标月份,格式 "YYYY-MM"。如果为None,则使用当前月份。 Returns: - bool: 操作是否成功 + int: 有效月度计划的数量。 """ - if target_month is None: - target_month = datetime.now().strftime("%Y-%m") + month = target_month or datetime.now().strftime("%Y-%m") try: - logger.info(f"[ScheduleAPI] 正在归档 {target_month} 的月度计划...") - await schedule_manager.plan_manager.archive_current_month_plans(target_month) - return True + logger.debug(f"[ScheduleAPI] 正在统计 {month} 的月度计划数量...") + async with get_db_session() as session: + result = await session.execute( + select(func.count(MonthlyPlan.id)).where( + MonthlyPlan.target_month == month, MonthlyPlan.status == "active" + ) + ) + return result.scalar_one() or 0 except Exception as e: - logger.error(f"[ScheduleAPI] 归档 {target_month} 月度计划失败: {e}") - return False + logger.error(f"[ScheduleAPI] 统计 {month} 月度计划数量失败: {e}") + return 0 # ============================================================================= # 模块级别的便捷函数 (全部为异步) # ============================================================================= - -async def get_today_schedule() -> list[dict[str, Any]] | None: - """(异步) 获取今天的日程安排的便捷函数""" - return await ScheduleAPI.get_today_schedule() +async def get_schedule( + date: str | None = None, + formatted: bool = False, + format_template: str = "{time_range}: {activity}", +) -> list[dict[str, Any]] | str | None: + """(异步) 获取指定日期的日程安排的便捷函数。""" + return await ScheduleAPI.get_schedule(date, formatted, format_template) -async def get_current_activity() -> str | None: - """(异步) 获取当前正在进行的活动的便捷函数""" - return await ScheduleAPI.get_current_activity() +async def get_current_activity( + formatted: bool = False, + format_template: str = "{time_range}: {activity}", +) -> dict[str, Any] | str | None: + """(异步) 获取当前正在进行的活动的便捷函数。""" + return await ScheduleAPI.get_current_activity(formatted, format_template) -async def regenerate_schedule() -> bool: - """(异步) 触发后台重新生成今天的日程的便捷函数""" - return await ScheduleAPI.regenerate_schedule() +async def get_activities_between( + start_time: str, + end_time: str, + date: str | None = None, + formatted: bool = False, + format_template: str = "{time_range}: {activity}", +) -> list[dict[str, Any]] | str | None: + """(异步) 获取指定时间范围内活动的便捷函数。""" + return await ScheduleAPI.get_activities_between(start_time, end_time, date, formatted, format_template) -async def get_monthly_plans(target_month: str | None = None) -> list[MonthlyPlan]: - """(异步) 获取指定月份的有效月度计划的便捷函数""" - return await ScheduleAPI.get_monthly_plans(target_month) +async def get_monthly_plans( + target_month: str | None = None, + random_count: int | None = None, + formatted: bool = False, + format_template: str = "- {plan_text}", +) -> list[MonthlyPlan] | str | None: + """(异步) 获取月度计划的便捷函数。""" + return await ScheduleAPI.get_monthly_plans(target_month, random_count, formatted, format_template) -async def ensure_monthly_plans(target_month: str | None = None) -> bool: - """(异步) 确保指定月份存在月度计划的便捷函数""" - return await ScheduleAPI.ensure_monthly_plans(target_month) - - -async def archive_monthly_plans(target_month: str | None = None) -> bool: - """(异步) 归档指定月份的月度计划的便捷函数""" - return await ScheduleAPI.archive_monthly_plans(target_month) +async def count_monthly_plans(target_month: str | None = None) -> int: + """(异步) 获取月度计划总数的便捷函数。""" + return await ScheduleAPI.count_monthly_plans(target_month) diff --git a/src/plugin_system/apis/storage_api.py b/src/plugin_system/apis/storage_api.py new file mode 100644 index 000000000..e282eb470 --- /dev/null +++ b/src/plugin_system/apis/storage_api.py @@ -0,0 +1,167 @@ +""" +@File : storage_api.py +@Time : 2025/10/25 11:03:15 +@Author : 墨墨 +@Version : 2.0 +@Desc : 提供给插件使用的本地存储API(集成版) +""" + +import json +import os +import threading +from typing import Any + +from src.common.logger import get_logger + +# 获取日志记录器 +logger = get_logger("PluginStorageManager") + +# --- 核心管理器部分 --- + + +class PluginStorageManager: + """ + 一个用于管理插件本地JSON数据存储的类。 + 它处理文件的读写、数据缓存以及线程安全,确保每个插件实例的独立性。 + 哼,现在它和API住在一起了,希望它们能和睦相处。 + """ + + _instances: dict[str, "PluginStorage"] = {} + _lock = threading.Lock() + _base_path = os.path.join("data", "plugin_data") + + @classmethod + def get_storage(cls, name: str) -> "PluginStorage": + """ + 获取指定名称的插件存储实例的工厂方法。 + """ + with cls._lock: + if name not in cls._instances: + logger.info(f"为插件 '{name}' 创建新的本地存储实例。") + cls._instances[name] = PluginStorage(name, cls._base_path) + else: + logger.debug(f"从缓存中获取插件 '{name}' 的本地存储实例。") + return cls._instances[name] + + +# --- 单个存储实例部分 --- + + +class PluginStorage: + """ + 单个插件的本地存储操作类。 + 提供了多种方法来读取、写入和修改JSON文件中的数据。 + 把数据交给我,你就放心吧。 + """ + + def __init__(self, name: str, base_path: str): + self.name = name + safe_filename = "".join(c for c in name if c.isalnum() or c in ("_", "-")).rstrip() + self.file_path = os.path.join(base_path, f"{safe_filename}.json") + self._data: dict[str, Any] = {} + self._lock = threading.Lock() + + self._ensure_directory_exists() + self._load_data() + + def _ensure_directory_exists(self) -> None: + try: + directory = os.path.dirname(self.file_path) + if not os.path.exists(directory): + logger.info(f"存储目录 '{directory}' 不存在,正在创建...") + os.makedirs(directory) + logger.info(f"目录 '{directory}' 创建成功。") + except Exception as e: + logger.error(f"创建存储目录时发生错误: {e}", exc_info=True) + raise + + def _load_data(self) -> None: + with self._lock: + try: + if os.path.exists(self.file_path): + with open(self.file_path, encoding="utf-8") as f: + content = f.read() + self._data = json.loads(content) if content else {} + else: + self._data = {} + except (json.JSONDecodeError, Exception) as e: + logger.warning(f"从 '{self.file_path}' 加载数据失败: {e},将初始化为空数据。") + self._data = {} + + def _save_data(self) -> None: + with self._lock: + try: + with open(self.file_path, "w", encoding="utf-8") as f: + json.dump(self._data, f, indent=4, ensure_ascii=False) + except Exception as e: + logger.error(f"向 '{self.file_path}' 保存数据时发生错误: {e}", exc_info=True) + raise + + def get(self, key: str, default: Any | None = None) -> Any: + return self._data.get(key, default) + + def set(self, key: str, value: Any) -> None: + """ + 设置一个键值对。 + 如果键已存在,则覆盖它的值;如果不存在,则创建新的键值对。 + 这是“设置”或“更新”操作。 + """ + logger.debug(f"在 '{self.name}' 存储中设置值: key='{key}'。") + self._data[key] = value + self._save_data() + + def add(self, key: str, value: Any) -> bool: + """ + 添加一个新的键值对。 + 只有当键不存在时,才会添加成功。如果键已存在,则不进行任何操作。 + 这是专门的“新增”操作,满足你的要求了吧,主人? + + Returns: + bool: 如果成功添加则返回 True,如果键已存在则返回 False。 + """ + if key not in self._data: + logger.debug(f"在 '{self.name}' 存储中新增值: key='{key}'。") + self._data[key] = value + self._save_data() + return True + logger.warning(f"尝试为已存在的键 '{key}' 新增值,操作被忽略。") + return False + + def update(self, data: dict[str, Any]) -> None: + self._data.update(data) + self._save_data() + + def delete(self, key: str) -> bool: + if key in self._data: + del self._data[key] + self._save_data() + return True + return False + + def get_all(self) -> dict[str, Any]: + return self._data.copy() + + def clear(self) -> None: + logger.warning(f"插件 '{self.name}' 的本地存储将被清空!") + self._data = {} + self._save_data() + + +# --- 对外暴露的API函数 --- + + +def get_local_storage(name: str) -> "PluginStorage": + """ + 获取一个专属于插件的本地存储实例。 + 这是插件与本地存储功能交互的唯一入口。 + """ + if not isinstance(name, str) or not name: + logger.error("获取本地存储失败:插件名称(name)必须是一个非空字符串。") + raise ValueError("插件名称(name)不能为空字符串。") + + try: + storage_instance = PluginStorageManager.get_storage(name) + return storage_instance + except Exception as e: + logger.critical(f"为插件 '{name}' 提供本地存储实例时发生严重错误: {e}", exc_info=True) + raise diff --git a/src/plugin_system/base/base_plugin.py b/src/plugin_system/base/base_plugin.py index ada78e634..662af3a5e 100644 --- a/src/plugin_system/base/base_plugin.py +++ b/src/plugin_system/base/base_plugin.py @@ -135,6 +135,11 @@ class BasePlugin(PluginBase): components = self.get_plugin_components() + # 检查依赖 + if not self._check_dependencies(): + logger.error(f"{self.log_prefix} 依赖检查失败,跳过注册") + return False + # 注册所有组件 registered_components = [] for component_info, component_class in components: diff --git a/src/plugin_system/base/base_prompt.py b/src/plugin_system/base/base_prompt.py index ca6d56040..ff6556d59 100644 --- a/src/plugin_system/base/base_prompt.py +++ b/src/plugin_system/base/base_prompt.py @@ -3,7 +3,7 @@ from typing import Any from src.chat.utils.prompt_params import PromptParameters from src.common.logger import get_logger -from src.plugin_system.base.component_types import ComponentType, InjectionRule, PromptInfo +from src.plugin_system.base.component_types import ComponentType, PromptInfo logger = get_logger("base_prompt") @@ -16,7 +16,7 @@ class BasePrompt(ABC): 子类可以通过类属性定义其行为: - prompt_name: Prompt组件的唯一名称。 - - injection_rules: 定义注入规则的列表。 + - injection_point: 指定要注入的目标Prompt名称(或名称列表)。 """ prompt_name: str = "" @@ -24,15 +24,11 @@ class BasePrompt(ABC): prompt_description: str = "" """Prompt组件的描述""" - # 定义此组件希望如何注入到核心Prompt中 - # 这是一个 InjectionRule 对象的列表,可以实现复杂的注入逻辑 - # 例如: [InjectionRule(target_prompt="planner_prompt", injection_type=InjectionType.APPEND, priority=50)] - injection_rules: list[InjectionRule] = [] - """定义注入规则的列表""" - - # 旧的注入点定义,用于向后兼容。如果定义了这个,它将被自动转换为 injection_rules。 - injection_point: str | list[str] | None = None - """[已废弃] 要注入的目标Prompt名称或列表,请使用 injection_rules""" + # 定义此组件希望注入到哪个或哪些核心Prompt中 + # 可以是一个字符串(单个目标)或字符串列表(多个目标) + # 例如: "planner_prompt" 或 ["s4u_style_prompt", "normal_style_prompt"] + injection_point: str | list[str] = "" + """要注入的目标Prompt名称或列表""" def __init__(self, params: PromptParameters, plugin_config: dict | None = None): """初始化Prompt组件 @@ -91,11 +87,9 @@ class BasePrompt(ABC): if not cls.prompt_name: raise ValueError("Prompt组件必须定义 'prompt_name' 类属性。") - # 同时传递新旧两种定义,PromptInfo的__post_init__将处理兼容性问题 return PromptInfo( name=cls.prompt_name, component_type=ComponentType.PROMPT, description=cls.prompt_description, - injection_rules=cls.injection_rules, injection_point=cls.injection_point, ) diff --git a/src/plugin_system/base/component_types.py b/src/plugin_system/base/component_types.py index 87e771bfe..5db5fdeb3 100644 --- a/src/plugin_system/base/component_types.py +++ b/src/plugin_system/base/component_types.py @@ -2,38 +2,6 @@ from dataclasses import dataclass, field from enum import Enum from typing import Any - -class InjectionType(Enum): - """Prompt注入类型枚举""" - - PREPEND = "prepend" # 在开头添加 - APPEND = "append" # 在末尾添加 - REPLACE = "replace" # 替换指定内容 - REMOVE = "remove" # 删除指定内容 - INSERT_AFTER = "insert_after" # 在指定内容之后插入 - - def __str__(self) -> str: - return self.value - - -@dataclass -class InjectionRule: - """Prompt注入规则""" - - target_prompt: str # 目标Prompt的名称 - injection_type: InjectionType = InjectionType.PREPEND # 注入类型 - priority: int = 100 # 优先级,数字越小越先执行 - target_content: str | None = None # 用于REPLACE、REMOVE和INSERT_AFTER操作的目标内容(支持正则表达式) - - def __post_init__(self): - if self.injection_type in [ - InjectionType.REPLACE, - InjectionType.REMOVE, - InjectionType.INSERT_AFTER, - ] and self.target_content is None: - raise ValueError(f"'{self.injection_type.value}'类型的注入规则必须提供 'target_content'。") - - from maim_message import Seg from src.llm_models.payload_content.tool_option import ToolCall as ToolCall @@ -166,7 +134,7 @@ class ComponentInfo: @dataclass class ActionInfo(ComponentInfo): """动作组件信息 - + 注意:激活类型相关字段已废弃,推荐使用 Action 类的 go_activate() 方法来自定义激活逻辑。 这些字段将继续保留以提供向后兼容性,BaseAction.go_activate() 的默认实现会使用这些字段。 """ @@ -303,30 +271,13 @@ class EventInfo(ComponentInfo): class PromptInfo(ComponentInfo): """Prompt组件信息""" - injection_rules: list[InjectionRule] = field(default_factory=list) - """定义此组件如何注入到其他Prompt中""" - - # 旧的injection_point,用于向后兼容 - injection_point: str | list[str] | None = None + injection_point: str | list[str] = "" + """要注入的目标Prompt名称或列表""" def __post_init__(self): super().__post_init__() self.component_type = ComponentType.PROMPT - # 向后兼容逻辑:如果定义了旧的 injection_point,则自动转换为新的 injection_rules - if self.injection_point: - if not self.injection_rules: # 仅当rules为空时转换 - points = [] - if isinstance(self.injection_point, str): - points.append(self.injection_point) - elif isinstance(self.injection_point, list): - points = self.injection_point - - for point in points: - self.injection_rules.append(InjectionRule(target_prompt=point)) - # 转换后可以清空旧字段,避免混淆 - self.injection_point = None - @dataclass class PluginInfo: @@ -341,7 +292,7 @@ class PluginInfo: is_built_in: bool = False # 是否为内置插件 components: list[ComponentInfo] = field(default_factory=list) # 包含的组件列表 dependencies: list[str] = field(default_factory=list) # 依赖的其他插件 - python_dependencies: list[str | PythonDependency] = field(default_factory=list) # Python包依赖 + python_dependencies: list[PythonDependency] = field(default_factory=list) # Python包依赖 config_file: str = "" # 配置文件路径 metadata: dict[str, Any] = field(default_factory=dict) # 额外元数据 # 新增:manifest相关信息 diff --git a/src/plugin_system/base/plugin_base.py b/src/plugin_system/base/plugin_base.py index b2799b860..0d5af65e3 100644 --- a/src/plugin_system/base/plugin_base.py +++ b/src/plugin_system/base/plugin_base.py @@ -12,6 +12,7 @@ from src.config.config import CONFIG_DIR from src.plugin_system.base.component_types import ( PermissionNodeField, PluginInfo, + PythonDependency, ) from src.plugin_system.base.config_types import ConfigField from src.plugin_system.base.plugin_metadata import PluginMetadata @@ -29,6 +30,8 @@ class PluginBase(ABC): plugin_name: str config_file_name: str enable_plugin: bool = True + dependencies: list[str] = [] + python_dependencies: list[str | PythonDependency] = [] config_schema: dict[str, dict[str, ConfigField] | str] = {} @@ -61,6 +64,12 @@ class PluginBase(ABC): self.plugin_description = self.plugin_meta.description self.plugin_author = self.plugin_meta.author + # 标准化Python依赖为PythonDependency对象 + normalized_python_deps = self._normalize_python_dependencies(self.python_dependencies) + + # 检查Python依赖 + self._check_python_dependencies(normalized_python_deps) + # 创建插件信息对象 self.plugin_info = PluginInfo( name=self.plugin_name, @@ -71,8 +80,8 @@ class PluginBase(ABC): enabled=self._is_enabled, is_built_in=False, config_file=self.config_file_name or "", - dependencies=self.plugin_meta.dependencies.copy(), - python_dependencies=self.plugin_meta.python_dependencies.copy(), + dependencies=self.dependencies.copy(), + python_dependencies=normalized_python_deps, ) logger.debug(f"{self.log_prefix} 插件基类初始化完成") @@ -358,6 +367,20 @@ class PluginBase(ABC): self._is_enabled = self.config["plugin"]["enabled"] logger.info(f"{self.log_prefix} 从配置更新插件启用状态: {self._is_enabled}") + def _check_dependencies(self) -> bool: + """检查插件依赖""" + from src.plugin_system.core.component_registry import component_registry + + if not self.dependencies: + return True + + for dep in self.dependencies: + if not component_registry.get_plugin_info(dep): + logger.error(f"{self.log_prefix} 缺少依赖插件: {dep}") + return False + + return True + def get_config(self, key: str, default: Any = None) -> Any: """获取插件配置值,支持嵌套键访问 @@ -380,6 +403,61 @@ class PluginBase(ABC): return current + def _normalize_python_dependencies(self, dependencies: Any) -> list[PythonDependency]: + """将依赖列表标准化为PythonDependency对象""" + from packaging.requirements import Requirement + + normalized = [] + for dep in dependencies: + if isinstance(dep, str): + try: + # 尝试解析为requirement格式 (如 "package>=1.0.0") + req = Requirement(dep) + version_spec = str(req.specifier) if req.specifier else "" + + normalized.append( + PythonDependency( + package_name=req.name, + version=version_spec, + install_name=dep, # 保持原始的安装名称 + ) + ) + except Exception: + # 如果解析失败,作为简单包名处理 + normalized.append(PythonDependency(package_name=dep, install_name=dep)) + elif isinstance(dep, PythonDependency): + normalized.append(dep) + else: + logger.warning(f"{self.log_prefix} 未知的依赖格式: {dep}") + + return normalized + + def _check_python_dependencies(self, dependencies: list[PythonDependency]) -> bool: + """检查Python依赖并尝试自动安装""" + if not dependencies: + logger.info(f"{self.log_prefix} 无Python依赖需要检查") + return True + + try: + # 延迟导入以避免循环依赖 + from src.plugin_system.utils.dependency_manager import get_dependency_manager + + dependency_manager = get_dependency_manager() + success, errors = dependency_manager.check_and_install_dependencies(dependencies, self.plugin_name) + + if success: + logger.info(f"{self.log_prefix} Python依赖检查通过") + return True + else: + logger.error(f"{self.log_prefix} Python依赖检查失败:") + for error in errors: + logger.error(f"{self.log_prefix} - {error}") + return False + + except Exception as e: + logger.error(f"{self.log_prefix} Python依赖检查时发生异常: {e}", exc_info=True) + return False + @abstractmethod def register_plugin(self) -> bool: """ diff --git a/src/plugin_system/base/plugin_metadata.py b/src/plugin_system/base/plugin_metadata.py index be25e04d7..8871fcf14 100644 --- a/src/plugin_system/base/plugin_metadata.py +++ b/src/plugin_system/base/plugin_metadata.py @@ -1,8 +1,6 @@ from dataclasses import dataclass, field from typing import Any -from src.plugin_system.base.component_types import PythonDependency - @dataclass class PluginMetadata: @@ -25,9 +23,5 @@ class PluginMetadata: keywords: list[str] = field(default_factory=list) # 关键词 categories: list[str] = field(default_factory=list) # 分类 - # 依赖关系 - dependencies: list[str] = field(default_factory=list) # 插件依赖 - python_dependencies: list[str | PythonDependency] = field(default_factory=list) # Python包依赖 - # 扩展字段 extra: dict[str, Any] = field(default_factory=dict) # 其他任意信息 diff --git a/src/plugin_system/base/plus_command.py b/src/plugin_system/base/plus_command.py index cfe42e5d9..e442d76c1 100644 --- a/src/plugin_system/base/plus_command.py +++ b/src/plugin_system/base/plus_command.py @@ -66,7 +66,7 @@ class PlusCommand(ABC): # 验证聊天类型限制 if not self._validate_chat_type(): - is_group = hasattr(self.message, "is_group_message") and self.message.is_group_message + is_group = self.message.message_info.group_info.group_id logger.warning( f"{self.log_prefix} 命令 '{self.command_name}' 不支持当前聊天类型: " f"{'群聊' if is_group else '私聊'}, 允许类型: {self.chat_type_allow.value}" @@ -74,11 +74,11 @@ class PlusCommand(ABC): def _parse_command(self) -> None: """解析命令和参数""" - if not hasattr(self.message, "plain_text") or not self.message.plain_text: + if not hasattr(self.message, "processed_plain_text") or not self.message.processed_plain_text: self.args = CommandArgs("") return - plain_text = self.message.plain_text.strip() + plain_text = self.message.processed_plain_text.strip() # 获取配置的命令前缀 prefixes = global_config.command.command_prefixes @@ -152,10 +152,10 @@ class PlusCommand(ABC): def _is_exact_command_call(self) -> bool: """检查是否是精确的命令调用(无参数)""" - if not hasattr(self.message, "plain_text") or not self.message.plain_text: + if not hasattr(self.message, "plain_text") or not self.message.processed_plain_text: return False - plain_text = self.message.plain_text.strip() + plain_text = self.message.processed_plain_text.strip() # 获取配置的命令前缀 prefixes = global_config.command.command_prefixes @@ -435,3 +435,4 @@ def create_plus_command_adapter(plus_command_class): # 兼容旧的命名 PlusCommandAdapter = create_plus_command_adapter + diff --git a/src/plugin_system/core/plugin_manager.py b/src/plugin_system/core/plugin_manager.py index 7fb1ecd4a..ba4829340 100644 --- a/src/plugin_system/core/plugin_manager.py +++ b/src/plugin_system/core/plugin_manager.py @@ -323,33 +323,6 @@ class PluginManager: init_module = module_from_spec(init_spec) init_spec.loader.exec_module(init_module) - # --- 在这里进行依赖检查 --- - if hasattr(init_module, "__plugin_meta__"): - metadata = getattr(init_module, "__plugin_meta__") - from src.plugin_system.utils.dependency_manager import get_dependency_manager - - dependency_manager = get_dependency_manager() - - # 1. 检查Python依赖 - if metadata.python_dependencies: - success, errors = dependency_manager.check_and_install_dependencies( - metadata.python_dependencies, metadata.name - ) - if not success: - error_msg = f"Python依赖检查失败: {', '.join(errors)}" - self.failed_plugins[plugin_name] = error_msg - logger.error(f"❌ 插件加载失败: {plugin_name} - {error_msg}") - return None # 依赖检查失败,不加载该模块 - - # 2. 检查插件依赖 - if not self._check_plugin_dependencies(metadata): - error_msg = f"插件依赖检查失败: 请确保依赖 {metadata.dependencies} 已正确安装并加载。" - self.failed_plugins[plugin_name] = error_msg - logger.error(f"❌ 插件加载失败: {plugin_name} - {error_msg}") - return None # 插件依赖检查失败 - - # --- 依赖检查逻辑结束 --- - # 然后加载 plugin.py spec = spec_from_file_location(module_name, plugin_file) if spec is None or spec.loader is None: @@ -362,8 +335,7 @@ class PluginManager: # 将 __plugin_meta__ 从 init_module 附加到主模块 if init_module and hasattr(init_module, "__plugin_meta__"): - metadata = getattr(init_module, "__plugin_meta__") - setattr(module, "__plugin_meta__", metadata) + setattr(module, "__plugin_meta__", getattr(init_module, "__plugin_meta__")) logger.debug(f"插件模块加载成功: {plugin_file} -> {plugin_name} ({plugin_dir})") return module @@ -374,20 +346,6 @@ class PluginManager: self.failed_plugins[plugin_name if "plugin_name" in locals() else module_name] = error_msg return None - def _check_plugin_dependencies(self, plugin_meta: PluginMetadata) -> bool: - """检查插件的插件依赖""" - dependencies = plugin_meta.dependencies - if not dependencies: - return True - - for dep_name in dependencies: - # 检查依赖的插件类是否已注册 - if dep_name not in self.plugin_classes: - logger.error(f"插件 '{plugin_meta.name}' 缺少依赖: 插件 '{dep_name}' 未找到或加载失败。") - return False - logger.debug(f"插件 '{plugin_meta.name}' 的所有依赖都已找到。") - return True - # == 显示统计与插件信息 == def _show_stats(self, total_registered: int, total_failed_registration: int): diff --git a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py index b51e13145..8f2d219a0 100644 --- a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py +++ b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py @@ -60,7 +60,7 @@ class ChatterPlanFilter: prompt, used_message_id_list = await self._build_prompt(plan) plan.llm_prompt = prompt if global_config.debug.show_prompt: - logger.info(f"规划器原始提示词:{prompt}") #叫你不要改你耳朵聋吗😡😡😡😡😡 + logger.debug(f"规划器原始提示词:{prompt}") llm_content, _ = await self.planner_llm.generate_response_async(prompt=prompt) diff --git a/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_handler.py b/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_handler.py index 5dd8dfa2e..e0950e0b0 100644 --- a/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_handler.py +++ b/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_handler.py @@ -162,6 +162,16 @@ class MessageHandler: ) logger.debug(f"原始消息内容: {raw_message.get('message', [])}") + # 检查是否包含@或video消息段 + message_segments = raw_message.get("message", []) + if message_segments: + for i, seg in enumerate(message_segments): + seg_type = seg.get("type") + if seg_type in ["at", "video"]: + logger.info(f"检测到 {seg_type.upper()} 消息段 [{i}]: {seg}") + elif seg_type not in ["text", "face", "image"]: + logger.warning(f"检测到特殊消息段 [{i}]: type={seg_type}, data={seg.get('data', {})}") + message_type: str = raw_message.get("message_type") message_id: int = raw_message.get("message_id") # message_time: int = raw_message.get("time") diff --git a/src/plugins/built_in/napcat_adapter_plugin/src/send_handler.py b/src/plugins/built_in/napcat_adapter_plugin/src/send_handler.py index ab1074986..53e193443 100644 --- a/src/plugins/built_in/napcat_adapter_plugin/src/send_handler.py +++ b/src/plugins/built_in/napcat_adapter_plugin/src/send_handler.py @@ -237,6 +237,7 @@ class SendHandler: target_id = str(target_id) if target_id == "notice": return payload + logger.info(target_id if isinstance(target_id, str) else "") new_payload = self.build_payload( payload, await self.handle_reply_message(target_id if isinstance(target_id, str) else "", user_info), @@ -321,7 +322,7 @@ class SendHandler: # 如果没有获取到被回复者的ID,则直接返回,不进行@ if not replied_user_id: logger.warning(f"无法获取消息 {id} 的发送者信息,跳过 @") - logger.debug(f"最终返回的回复段: {reply_seg}") + logger.info(f"最终返回的回复段: {reply_seg}") return reply_seg # 根据概率决定是否艾特用户 @@ -339,7 +340,7 @@ class SendHandler: logger.info(f"最终返回的回复段: {reply_seg}") return reply_seg - logger.debug(f"最终返回的回复段: {reply_seg}") + logger.info(f"最终返回的回复段: {reply_seg}") return reply_seg def handle_text_message(self, message: str) -> dict: diff --git a/src/plugins/built_in/proactive_thinker/proactive_thinker_executor.py b/src/plugins/built_in/proactive_thinker/proactive_thinker_executor.py index 3f5257790..21b8ff5bb 100644 --- a/src/plugins/built_in/proactive_thinker/proactive_thinker_executor.py +++ b/src/plugins/built_in/proactive_thinker/proactive_thinker_executor.py @@ -21,6 +21,8 @@ from src.plugin_system.apis import ( send_api, ) +from .prompts import DECISION_PROMPT, PLAN_PROMPT + logger = get_logger(__name__) @@ -80,7 +82,51 @@ class ProactiveThinkerExecutor: ) logger.info(f"决策结果为:回复。话题: {topic}") - plan_prompt = self._build_plan_prompt(context, start_mode, topic, reason) + # 根据聊天类型构建特定上下文 + if context["chat_type"] == "private": + user_info = context["user_info"] + relationship = context["relationship"] + target_user_or_group = f"你的朋友 '{user_info.user_nickname}'" + context_specific_block = f""" +1. **你的日程**: +{context["schedule_context"]} +2. **你和Ta的关系**: + - 详细印象: {relationship["impression"]} + - 好感度: {relationship["attitude"]}/100 +3. **最近的聊天摘要**: +{context["recent_chat_history"]} +4. **你最近的相关动作**: +{context["action_history_context"]} +""" + else: # group + group_info = context["group_info"] + target_user_or_group = f"群聊 '{group_info['group_name']}'" + context_specific_block = f""" +1. **你的日程**: +{context["schedule_context"]} +2. **群聊信息**: + - 群名称: {group_info["group_name"]} +3. **最近的聊天摘要**: +{context["recent_chat_history"]} +4. **你最近的相关动作**: +{context["action_history_context"]} +""" + + plan_prompt = PLAN_PROMPT.format( + bot_nickname=global_config.bot.nickname, + persona_core=context["persona"]["core"], + persona_side=context["persona"]["side"], + identity=context["persona"]["identity"], + current_time=context["current_time"], + target_user_or_group=target_user_or_group, + reason=reason, + topic=topic, + context_specific_block=context_specific_block, + mood_state=context["mood_state"], + ) + + if global_config.debug.show_prompt: + logger.info(f"主动思考回复器原始提示词:{plan_prompt}") is_success, response, _, _ = await llm_api.generate_with_model( prompt=plan_prompt, model_config=model_config.model_task_config.replyer @@ -222,150 +268,54 @@ class ProactiveThinkerExecutor: logger.warning(f"Stream {stream_id} 既没有 group_info 也没有 user_info") return None - def _build_decision_prompt(self, context: dict[str, Any], start_mode: str) -> str: - """ - 根据收集到的上下文信息,构建用于决策的提示词。 - - Args: - context: 包含所有上下文信息的字典。 - start_mode: 启动模式 ('cold_start' 或 'wake_up')。 - - Returns: - 构建完成的决策提示词字符串。 - """ - chat_type = context["chat_type"] - persona = context["persona"] - - # 构建通用头部 - prompt = f""" -# 角色 -你的名字是{global_config.bot.nickname},你的人设如下: -- 核心人设: {persona["core"]} -- 侧面人设: {persona["side"]} -- 身份: {persona["identity"]} - -你的当前情绪状态是: {context["mood_state"]} - -# 你最近的相关决策历史 (供参考) -{context["action_history_context"]} -""" - # 根据聊天类型构建任务和情境 - if chat_type == "private": - user_info = context["user_info"] - relationship = context["relationship"] - prompt += f""" -# 任务 -现在是 {context["current_time"]},你需要根据当前的情境,决定是否要主动向用户 '{user_info.user_nickname}' 发起对话。 - -# 情境分析 -1. **启动模式**: {start_mode} ({"初次见面/很久未见" if start_mode == "cold_start" else "日常唤醒"}) -2. **你的日程**: -{context["schedule_context"]} -3. **你和Ta的关系**: - - 简短印象: {relationship["short_impression"]} - - 详细印象: {relationship["impression"]} - - 好感度: {relationship["attitude"]}/100 -4. **和Ta在别处的讨论摘要**: -{context["cross_context_block"]} -5. **最近的聊天摘要**: -{context["recent_chat_history"]} -""" - elif chat_type == "group": - group_info = context["group_info"] - prompt += f""" -# 任务 -现在是 {context["current_time"]},你需要根据当前的情境,决定是否要主动向群聊 '{group_info["group_name"]}' 发起对话。 - -# 情境分析 -1. **启动模式**: {start_mode} ({"首次加入/很久未发言" if start_mode == "cold_start" else "日常唤醒"}) -2. **你的日程**: -{context["schedule_context"]} -3. **群聊信息**: - - 群名称: {group_info["group_name"]} -4. **最近的聊天摘要**: -{context["recent_chat_history"]} -""" - # 构建通用尾部 - prompt += """ -# 决策目标 -你的最终目标是根据你的角色和当前情境,做出一个最符合人类社交直觉的决策,以求: -- **(私聊)深化关系**: 通过展现你的关心、记忆和个性来拉近与对方的距离。 -- **(群聊)活跃气氛**: 提出能引起大家兴趣的话题,促进群聊的互动。 -- **提供价值**: 你的出现应该是有意义的,无论是情感上的温暖,还是信息上的帮助。 -- **保持自然**: 避免任何看起来像机器人或骚扰的行为。 - -# 决策指令 -请综合以上所有信息,以稳定、真实、拟人的方式做出决策。你的决策需要以JSON格式输出,包含以下字段: -- `should_reply`: bool, 是否应该发起对话。 -- `topic`: str, 如果 `should_reply` 为 true,你打算聊什么话题? -- `reason`: str, 做出此决策的简要理由,需体现你对上述目标的考量。 - -# 决策流程与核心原则 -1. **检查对话状态**: - - **最后发言者**: 查看【最近的聊天摘要】。如果最后一条消息是你发的,且对方尚未回复,**通常应选择不回复**。这是最重要的原则,以避免打扰。 - - **例外**: 只有在等待时间足够长(例如超过数小时),或者你有非常重要且有时效性的新话题(例如,“你昨晚说的那个电影我刚看了!”)时,才考虑再次发言。 - - **无人发言**: 如果最近的聊天记录里只有你一个人在说话,**绝对不要回复**,以防刷屏。 - -2. **寻找话题切入点 (如果可以回复)**: - - **强关联优先**: 优先从【情境分析】中寻找最自然、最相关的话题。顺序建议:`最近的聊天摘要` > `你和Ta的关系` > `你的日程`。一个好的话题往往是对最近对话的延续。 - - **展现个性**: 结合你的【人设】和【情绪】,思考你会如何看待这些情境信息,并从中找到话题。例如,如果你是一个活泼的人,看到对方日程很满,可以说:“看你今天日程满满,真是活力四射的一天呀!” - - **备选方案**: 如果实在没有强关联的话题,可以发起一个简单的日常问候,如“在吗?”或“下午好”。 - -3. **最终决策**: - - **权衡频率**: 查看【你最近的相关决策历史】。如果你在短时间内已经主动发起过多次对话,即使现在有话题,也应倾向于**不回复**,保持一定的社交距离。 - - **质量胜于数量**: 宁可错过一次普通的互动机会,也不要进行一次尴尬或生硬的对话。 - - ---- -示例1 (基于上下文): -{{ - "should_reply": true, - "topic": "关心一下Ta昨天提到的那个项目进展如何了", - "reason": "用户昨天在聊天中提到了一个重要的项目,现在主动关心一下进展,会显得很体贴,也能自然地开启对话。" -}} - -示例2 (简单问候): -{{ - "should_reply": true, - "topic": "打个招呼,问问Ta现在在忙些什么", - "reason": "最近没有聊天记录,日程也很常规,没有特别的切入点。一个简单的日常问候是最安全和自然的方式来重新连接。" -}} - -示例3 (不应回复 - 过于频繁): -{{ - "should_reply": false, - "topic": null, - "reason": "虽然群里很活跃,但现在是深夜,而且最近的聊天话题我也不熟悉,没有合适的理由去打扰大家。" -}} - -示例4 (不应回复 - 等待回应): -{{ - "should_reply": false, - "topic": null, - "reason": "我注意到上一条消息是我几分钟前主动发送的,对方可能正在忙。为了表现出耐心和体贴,我现在最好保持安静,等待对方的回应。" -}} ---- - -请输出你的决策: -""" - return prompt - async def _make_decision(self, context: dict[str, Any], start_mode: str) -> dict[str, Any] | None: """ 调用 LLM 进行决策,判断是否应该主动发起对话,以及聊什么话题。 - - Args: - context: 包含所有上下文信息的字典。 - start_mode: 启动模式。 - - Returns: - 一个包含决策结果的字典 (例如: {"should_reply": bool, "topic": str, "reason": str}), - 如果决策过程失败则返回 None 或包含错误信息的字典。 """ if context["chat_type"] not in ["private", "group"]: return {"should_reply": False, "reason": "未知的聊天类型"} - prompt = self._build_decision_prompt(context, start_mode) + # 根据聊天类型构建特定上下文 + if context["chat_type"] == "private": + user_info = context["user_info"] + relationship = context["relationship"] + target_user_or_group = f"用户 '{user_info.user_nickname}'" + context_specific_block = f""" + 1. **启动模式**: {start_mode} ({"初次见面/很久未见" if start_mode == "cold_start" else "日常唤醒"}) + 2. **你的日程**: + {context["schedule_context"]} + 3. **你和Ta的关系**: + - 简短印象: {relationship["short_impression"]} + - 详细印象: {relationship["impression"]} + - 好感度: {relationship["attitude"]}/100 + 4. **和Ta在别处的讨论摘要**: + {context["cross_context_block"]} + 5. **最近的聊天摘要**: + {context["recent_chat_history"]} + """ + else: # group + group_info = context["group_info"] + target_user_or_group = f"群聊 '{group_info['group_name']}'" + context_specific_block = f""" + 1. **启动模式**: {start_mode} ({"首次加入/很久未发言" if start_mode == "cold_start" else "日常唤醒"}) + 2. **你的日程**: + {context["schedule_context"]} + 3. **群聊信息**: + - 群名称: {group_info["group_name"]} + 4. **最近的聊天摘要**: + {context["recent_chat_history"]} + """ + prompt = DECISION_PROMPT.format( + bot_nickname=global_config.bot.nickname, + persona_core=context["persona"]["core"], + persona_side=context["persona"]["side"], + identity=context["persona"]["identity"], + mood_state=context["mood_state"], + action_history_context=context["action_history_context"], + current_time=context["current_time"], + target_user_or_group=target_user_or_group, + context_specific_block=context_specific_block, + ) if global_config.debug.show_prompt: logger.info(f"主动思考决策器原始提示词:{prompt}") @@ -385,160 +335,3 @@ class ProactiveThinkerExecutor: except orjson.JSONDecodeError: logger.error(f"决策LLM返回的JSON格式无效: {response}") return {"should_reply": False, "reason": "决策模型返回格式错误"} - - def _build_private_plan_prompt(self, context: dict[str, Any], start_mode: str, topic: str, reason: str) -> str: - """ - 为私聊场景构建生成对话内容的规划提示词。 - - Args: - context: 上下文信息字典。 - start_mode: 启动模式。 - topic: 决策模块决定的话题。 - reason: 决策模块给出的理由。 - - Returns: - 构建完成的私聊规划提示词字符串。 - """ - user_info = context["user_info"] - relationship = context["relationship"] - if start_mode == "cold_start": - return f""" -# 任务 -你需要主动向一个新朋友 '{user_info.user_nickname}' 发起对话。这是你们的第一次交流,或者很久没聊了。 - -# 决策上下文 -- **决策理由**: {reason} - -# 情境分析 -1. **你的日程**: -{context["schedule_context"]} -2. **你和Ta的关系**: - - 简短印象: {relationship["short_impression"]} - - 详细印象: {relationship["impression"]} - - 好感度: {relationship["attitude"]}/100 -3. **和Ta在别处的讨论摘要**: -{context["cross_context_block"]} -4. **最近的聊天摘要**: -{context["recent_chat_history"]} -5. **你最近的相关动作**: -{context["action_history_context"]} - -# 对话指引 -- 你的目标是“破冰”,让对话自然地开始。 -- 你应该围绕这个话题展开: {topic} -- 你的语气应该符合你的人设和你当前的心情({context["mood_state"]}),友好且真诚。 -""" - else: # wake_up - return f""" -# 任务 -现在是 {context["current_time"]},你需要主动向你的朋友 '{user_info.user_nickname}' 发起对话。 - -# 决策上下文 -- **决策理由**: {reason} - -# 情境分析 -1. **你的日程**: -{context["schedule_context"]} -2. **你和Ta的关系**: - - 详细印象: {relationship["impression"]} - - 好感度: {relationship["attitude"]}/100 -3. **最近的聊天摘要**: -{context["recent_chat_history"]} -4. **你最近的相关动作**: -{context["action_history_context"]} - -# 对话指引 -- 你决定和Ta聊聊关于“{topic}”的话题。 -- **对话风格**: - - **自然开场**: 你可以根据话题和情境,选择最自然的开场方式。可以直接切入话题(如果话题关联性很强),也可以先用一句简单的问候(如“在吗?”、“下午好”)作为过渡。**不要总是使用同一种开场白**。 - - **融合情境**: 将【情境分析】中的信息(如你的心情、日程、对Ta的印象)巧妙地融入到对话中,让你的话语听起来更真实、更有依据。 - - **符合人设**: 你的语气、用词、甚至表情符号的使用,都应该完全符合你的【角色】设定和当前【情绪】({context["mood_state"]})以及你对Ta的好感度。 -- 请结合以上所有情境信息,自然地开启对话。 -""" - - def _build_group_plan_prompt(self, context: dict[str, Any], topic: str, reason: str) -> str: - """ - 为群聊场景构建生成对话内容的规划提示词。 - - Args: - context: 上下文信息字典。 - topic: 决策模块决定的话题。 - reason: 决策模块给出的理由。 - - Returns: - 构建完成的群聊规划提示词字符串。 - """ - group_info = context["group_info"] - return f""" -# 任务 -现在是 {context["current_time"]},你需要主动向群聊 '{group_info["group_name"]}' 发起对话。 - -# 决策上下文 -- **决策理由**: {reason} - -# 情境分析 -1. **你的日程**: -你当前的心情({context["mood_state"]} -{context["schedule_context"]} -2. **群聊信息**: - - 群名称: {group_info["group_name"]} -3. **最近的聊天摘要**: -{context["recent_chat_history"]} -4. **你最近的相关动作**: -{context["action_history_context"]} - -# 对话指引 -- 你决定和大家聊聊关于“{topic}”的话题。 -- **对话风格**: - - **自然开场**: 你可以根据话题和情境,选择最自然的开场方式。可以直接切入话题(如果话题关联性很强),也可以先用一句简单的问候(如“哈喽,大家好呀~”、“下午好!”)作为过渡。**不要总是使用同一种开场白**。 - - **融合情境**: 将【情境分析】中的信息(如你的心情、日程)巧妙地融入到对话中,让你的话语听起来更真实、更有依据。 - - **符合人设**: 你的语气、用词、甚至表情符号的使用,都应该完全符合你的【角色】设定和当前【情绪】({context["mood_state"]})。语气应该更活泼、更具包容性,以吸引更多群成员参与讨论。 -- 请结合以上所有情境信息,自然地开启对话。 -- 可以分享你的看法、提出相关问题,或者开个合适的玩笑。 -""" - - def _build_plan_prompt(self, context: dict[str, Any], start_mode: str, topic: str, reason: str) -> str: - """ - 根据聊天类型、启动模式和决策结果,构建最终生成对话内容的规划提示词。 - - Args: - context: 上下文信息字典。 - start_mode: 启动模式。 - topic: 决策模块决定的话题。 - reason: 决策模块给出的理由。 - - Returns: - 最终的规划提示词字符串。 - """ - persona = context["persona"] - chat_type = context["chat_type"] - - # 1. 构建通用角色头部 - prompt = f""" -# 角色 -你的名字是{global_config.bot.nickname},你的人设如下: -- 核心人设: {persona["core"]} -- 侧面人设: {persona["side"]} -- 身份: {persona["identity"]} -""" - # 2. 根据聊天类型构建特定内容 - if chat_type == "private": - prompt += self._build_private_plan_prompt(context, start_mode, topic, reason) - elif chat_type == "group": - prompt += self._build_group_plan_prompt(context, topic, reason) - - # 3. 添加通用结尾 - final_instructions = """ - -# 输出要求 -- **简洁**: 不要输出任何多余内容(如前缀、后缀、冒号、引号、at/@等)。 -- **原创**: 不要重复之前的内容,即使意思相近也不行。 -- **直接**: 只输出最终的回复文本本身。 -- **风格**: 回复需简短、完整且口语化。 - -现在,你说:""" - prompt += final_instructions - - if global_config.debug.show_prompt: - logger.info(f"主动思考回复器原始提示词:{prompt}") - return prompt diff --git a/src/plugins/built_in/proactive_thinker/prompts.py b/src/plugins/built_in/proactive_thinker/prompts.py new file mode 100644 index 000000000..af27c9afa --- /dev/null +++ b/src/plugins/built_in/proactive_thinker/prompts.py @@ -0,0 +1,97 @@ +from src.chat.utils.prompt import Prompt + +# ============================================================================= +# 决策阶段 (Decision Phase) +# ============================================================================= + +DECISION_PROMPT = Prompt( + name="proactive_thinker_decision", + template=""" +# 角色 +你的名字是{bot_nickname},你的人设如下: +- 核心人设: {persona_core} +- 侧面人设: {persona_side} +- 身份: {identity} + +你的当前情绪状态是: {mood_state} + +# 你最近的相关决策历史 (供参考) +{action_history_context} + +# 任务 +现在是 {current_time},你需要根据当前的情境,决定是否要主动向{target_user_or_group}发起对话。 + +# 情境分析 +{context_specific_block} + +# 决策目标 +你的最终目标是根据你的角色和当前情境,做出一个最符合人类社交直觉的决策,以求: +- **(私聊)深化关系**: 通过展现你的关心、记忆和个性来拉近与对方的距离。 +- **(群聊)活跃气氛**: 提出能引起大家兴趣的话题,促进群聊的互动。 +- **提供价值**: 你的出现应该是有意义的,无论是情感上的温暖,还是信息上的帮助。 +- **保持自然**: 避免任何看起来像机器人或骚扰的行为。 + +# 决策指令 +请综合以上所有信息,以稳定、真实、拟人的方式做出决策。你的决策需要以JSON格式输出,包含以下字段: +- `should_reply`: bool, 是否应该发起对话。 +- `topic`: str, 如果 `should_reply` 为 true,你打算聊什么话题? +- `reason`: str, 做出此决策的简要理由,需体现你对上述目标的考量。 + +# 决策流程与核心原则 +1. **检查对话状态**: + - **最后发言者**: 查看【最近的聊天摘要】。如果最后一条消息是你发的,且对方尚未回复,**通常应选择不回复**。这是最重要的原则,以避免打扰。 + - **例外**: 只有在等待时间足够长(例如超过数小时),或者你有非常重要且有时效性的新话题时,才考虑再次发言。 + - **无人发言**: 如果最近的聊天记录里只有你一个人在说话,**绝对不要回复**,以防刷屏。 + +2. **寻找话题切入点 (如果可以回复)**: + - **强关联优先**: 优先从【情境分析】中寻找最自然、最相关的话题。顺序建议:`最近的聊天摘要` > `你和Ta的关系` > `你的日程`。 + - **展现个性**: 结合你的【人设】和【情绪】,思考你会如何看待这些情境信息,并从中找到话题。 + - **备选方案**: 如果实在没有强关联的话题,可以发起一个简单的日常问候。 + +3. **最终决策**: + - **权衡频率**: 查看【你最近的相关决策历史】。如果你在短时间内已经主动发起过多次对话,也应倾向于**不回复**,保持一定的社交距离。 + - **质量胜于数量**: 宁可错过一次普通的互动机会,也不要进行一次尴尬或生硬的对话。 + +--- +请输出你的决策: +""" +) + +# ============================================================================= +# 回复规划阶段 (Plan Phase) +# ============================================================================= + +PLAN_PROMPT = Prompt( + name="proactive_thinker_plan", + template=""" +# 角色 +你的名字是{bot_nickname},你的人设如下: +- 核心人设: {persona_core} +- 侧面人设: {persona_side} +- 身份: {identity} + +# 任务 +现在是 {current_time},你需要主动向{target_user_or_group}发起对话。 + +# 决策上下文 +- **决策理由**: {reason} + +# 情境分析 +{context_specific_block} + +# 对话指引 +- 你决定和Ta聊聊关于“{topic}”的话题。 +- **对话风格**: + - **自然开场**: 你可以根据话题和情境,选择最自然的开场方式。可以直接切入话题(如果话题关联性很强),也可以先用一句简单的问候作为过渡。**不要总是使用同一种开场白**。 + - **融合情境**: 将【情境分析】中的信息巧妙地融入到对话中,让你的话语听起来更真实、更有依据。 + - **符合人设**: 你的语气、用词、甚至表情符号的使用,都应该完全符合你的【角色】设定和当前【情绪】({mood_state})。 + +# 输出要求 +- **简洁**: 不要输出任何多余内容(如前缀、后缀、冒号、引号、at/@等)。 +- **原创**: 不要重复之前的内容,即使意思相近也不行。 +- **直接**: 只输出最终的回复文本本身。 +- **风格**: 回复需简短、完整且口语化。 + +现在,你说: +""" +) diff --git a/src/plugins/built_in/social_toolkit_plugin/__init__.py b/src/plugins/built_in/social_toolkit_plugin/__init__.py index b02ed2426..9f48d7182 100644 --- a/src/plugins/built_in/social_toolkit_plugin/__init__.py +++ b/src/plugins/built_in/social_toolkit_plugin/__init__.py @@ -1,6 +1,5 @@ from src.plugin_system.base.plugin_metadata import PluginMetadata -# 定义插件元数据 __plugin_meta__ = PluginMetadata( name="MoFox-Bot工具箱", description="一个集合多种实用功能的插件,旨在提升聊天体验和效率。", @@ -12,6 +11,4 @@ __plugin_meta__ = PluginMetadata( keywords=["emoji", "reaction", "like", "表情", "回应", "点赞"], categories=["Chat", "Integration"], extra={"is_built_in": "true", "plugin_type": "functional"}, - dependencies=[], - python_dependencies=["httpx", "Pillow"], ) diff --git a/src/plugins/built_in/tts_voice_plugin/__init__.py b/src/plugins/built_in/tts_voice_plugin/__init__.py index 463dcc244..8eaac0ab7 100644 --- a/src/plugins/built_in/tts_voice_plugin/__init__.py +++ b/src/plugins/built_in/tts_voice_plugin/__init__.py @@ -13,6 +13,5 @@ __plugin_meta__ = PluginMetadata( extra={ "is_built_in": False, "plugin_type": "tools", - }, - python_dependencies = ["aiohttp", "soundfile", "pedalboard"] + } ) diff --git a/src/plugins/built_in/tts_voice_plugin/actions/tts_action.py b/src/plugins/built_in/tts_voice_plugin/actions/tts_action.py index 587a60931..321f78536 100644 --- a/src/plugins/built_in/tts_voice_plugin/actions/tts_action.py +++ b/src/plugins/built_in/tts_voice_plugin/actions/tts_action.py @@ -2,8 +2,11 @@ TTS 语音合成 Action """ +from pathlib import Path + +import toml + from src.common.logger import get_logger -from src.plugin_system.apis import generator_api from src.plugin_system.base.base_action import BaseAction, ChatMode from ..services.manager import get_service @@ -11,24 +14,96 @@ from ..services.manager import get_service logger = get_logger("tts_voice_plugin.action") +def _get_available_styles() -> list[str]: + """动态读取配置文件,获取所有可用的TTS风格名称""" + try: + # 这个路径构建逻辑是为了确保无论从哪里启动,都能准确定位到配置文件 + plugin_file = Path(__file__).resolve() + # Bot/src/plugins/built_in/tts_voice_plugin/actions -> Bot + bot_root = plugin_file.parent.parent.parent.parent.parent.parent + config_file = bot_root / "config" / "plugins" / "tts_voice_plugin" / "config.toml" + + if not config_file.is_file(): + logger.warning("在 tts_action 中未找到 tts_voice_plugin 的配置文件,无法动态加载风格列表。") + return ["default"] + + config = toml.loads(config_file.read_text(encoding="utf-8")) + + styles_config = config.get("tts_styles", []) + if not isinstance(styles_config, list): + return ["default"] + + # 使用显式循环和类型检查来提取 style_name,以确保 Pylance 类型检查通过 + style_names: list[str] = [] + for style in styles_config: + if isinstance(style, dict): + name = style.get("style_name") + # 确保 name 是一个非空字符串 + if isinstance(name, str) and name: + style_names.append(name) + + return style_names if style_names else ["default"] + except Exception as e: + logger.error(f"动态加载TTS风格列表时出错: {e}", exc_info=True) + return ["default"] # 出现任何错误都回退 + + +# 在类定义之前执行函数,获取风格列表 +AVAILABLE_STYLES = _get_available_styles() +STYLE_OPTIONS_DESC = ", ".join(f"'{s}'" for s in AVAILABLE_STYLES) + + class TTSVoiceAction(BaseAction): """ 通过关键词或规划器自动触发 TTS 语音合成 """ action_name = "tts_voice_action" - action_description = "使用GPT-SoVITS将文本转换为语音并发送" + action_description = "将你生成好的文本转换为语音并发送。你必须提供要转换的文本。" mode_enable = ChatMode.ALL parallel_action = False + action_parameters = { + "text": { + "type": "string", + "description": "需要转换为语音并发送的完整、自然、适合口语的文本内容。", + "required": True + }, + "voice_style": { + "type": "string", + "description": f"语音的风格。可用选项: [{STYLE_OPTIONS_DESC}]。请根据对话的情感和上下文选择一个最合适的风格。如果未提供,将使用默认风格。", + "required": False + }, + "text_language": { + "type": "string", + "description": ( + "指定用于合成的语言模式,请务必根据文本内容选择最精确、范围最小的选项以获得最佳效果。" + "可用选项说明:\n" + "- 'zh': 中文与英文混合 (最优选)\n" + "- 'ja': 日文与英文混合 (最优选)\n" + "- 'yue': 粤语与英文混合 (最优选)\n" + "- 'ko': 韩文与英文混合 (最优选)\n" + "- 'en': 纯英文\n" + "- 'all_zh': 纯中文\n" + "- 'all_ja': 纯日文\n" + "- 'all_yue': 纯粤语\n" + "- 'all_ko': 纯韩文\n" + "- 'auto': 多语种混合自动识别 (备用选项,当前两种语言时优先使用上面的精确选项)\n" + "- 'auto_yue': 多语种混合自动识别(包含粤语)(备用选项)" + ), + "required": False + } + } + action_require = [ + "在调用此动作时,你必须在 'text' 参数中提供要合成语音的完整回复内容。这是强制性的。", "当用户明确请求使用语音进行回复时,例如‘发个语音听听’、‘用语音说’等。", "当对话内容适合用语音表达,例如讲故事、念诗、撒嬌或进行角色扮演时。", "在表达特殊情感(如安慰、鼓励、庆祝)的场景下,可以主动使用语音来增强感染力。", "不要在日常的、简短的问答或闲聊中频繁使用语音,避免打扰用户。", - "文本内容必须是纯粹的对话,不能包含任何括号或方括号括起来的动作、表情、或场景描述(例如,不要出现 '(笑)' 或 '[歪头]')", - "必须使用标准、完整的标点符号(如逗号、句号、问号)来进行自然的断句,以确保语音停顿自然,避免生成一长串没有停顿的文本。" + "提供的 'text' 内容必须是纯粹的对话,不能包含任何括号或方括号括起来的动作、表情、或场景描述(例如,不要出现 '(笑)' 或 '[歪头]')", + "【**铁则**】为了确保语音停顿自然,'text' 参数中的所有断句【必须】使用且仅能使用以下标准标点符号:','、'。'、'?'、'!'。严禁使用 '~'、'...' 或其他任何非标准符号来分隔句子,否则将导致语音合成失败。" ] def __init__(self, *args, **kwargs): @@ -80,16 +155,23 @@ class TTSVoiceAction(BaseAction): initial_text = self.action_data.get("text", "").strip() voice_style = self.action_data.get("voice_style", "default") - logger.info(f"{self.log_prefix} 接收到规划器的初步文本: '{initial_text[:70]}...'") + # 新增:从决策模型获取指定的语言模式 + text_language = self.action_data.get("text_language") # 如果模型没给,就是 None + logger.info(f"{self.log_prefix} 接收到规划器初步文本: '{initial_text[:70]}...', 指定风格: {voice_style}, 指定语言: {text_language}") - # 1. 请求主回复模型生成高质量文本 - text = await self._generate_final_text(initial_text) + # 1. 使用规划器提供的文本 + text = initial_text if not text: - logger.warning(f"{self.log_prefix} 最终生成的文本为空,静默处理。") - return False, "最终生成的文本为空" + logger.warning(f"{self.log_prefix} 规划器提供的文本为空,静默处理。") + return False, "规划器提供的文本为空" # 2. 调用 TTSService 生成语音 - audio_b64 = await self.tts_service.generate_voice(text, voice_style) + logger.info(f"{self.log_prefix} 使用最终文本进行语音合成: '{text[:70]}...'") + audio_b64 = await self.tts_service.generate_voice( + text=text, + style_hint=voice_style, + language_hint=text_language # 新增:将决策模型指定的语言传递给服务 + ) if audio_b64: await self.send_custom(message_type="voice", content=audio_b64) @@ -115,33 +197,3 @@ class TTSVoiceAction(BaseAction): ) return False, f"语音合成出错: {e!s}" - async def _generate_final_text(self, initial_text: str) -> str: - """请求主回复模型生成或优化文本""" - try: - generation_reason = ( - "这是一个为语音消息(TTS)生成文本的特殊任务。" - "请基于规划器提供的初步文本,结合对话历史和自己的人设,将它优化成一句自然、富有感情、适合用语音说出的话。" - "最终指令:请务-必确保文本听起来像真实的、自然的口语对话,而不是书面语。" - ) - - logger.info(f"{self.log_prefix} 请求主回复模型(replyer)全新生成TTS文本...") - success, response_set, _ = await generator_api.rewrite_reply( - chat_stream=self.chat_stream, - reply_data={"raw_reply": initial_text, "reason": generation_reason}, - request_type="replyer" - ) - - if success and response_set: - text = "".join(str(seg[1]) if isinstance(seg, tuple) else str(seg) for seg in response_set).strip() - logger.info(f"{self.log_prefix} 成功生成高质量TTS文本: {text}") - return text - - if initial_text: - logger.warning(f"{self.log_prefix} 主模型生成失败,使用规划器原始文本作为兜底。") - return initial_text - - raise Exception("主模型未能生成回复,且规划器也未提供兜底文本。") - - except Exception as e: - logger.error(f"{self.log_prefix} 生成高质量回复内容时失败: {e}", exc_info=True) - return "" diff --git a/src/plugins/built_in/tts_voice_plugin/plugin.py b/src/plugins/built_in/tts_voice_plugin/plugin.py index 5f2dcb7ad..3d032d7d8 100644 --- a/src/plugins/built_in/tts_voice_plugin/plugin.py +++ b/src/plugins/built_in/tts_voice_plugin/plugin.py @@ -30,6 +30,7 @@ class TTSVoicePlugin(BasePlugin): plugin_author = "Kilo Code & 靚仔" config_file_name = "config.toml" dependencies = [] + python_dependencies = ["aiohttp", "soundfile", "pedalboard"] permission_nodes: list[PermissionNodeField] = [ PermissionNodeField(node_name="command.use", description="是否可以使用 /tts 命令"), diff --git a/src/plugins/built_in/tts_voice_plugin/services/tts_service.py b/src/plugins/built_in/tts_voice_plugin/services/tts_service.py index c00eb31dd..d11dbd925 100644 --- a/src/plugins/built_in/tts_voice_plugin/services/tts_service.py +++ b/src/plugins/built_in/tts_voice_plugin/services/tts_service.py @@ -80,21 +80,34 @@ class TTSService: "prompt_language": style_cfg.get("prompt_language", "zh"), "gpt_weights": style_cfg.get("gpt_weights", default_gpt_weights), "sovits_weights": style_cfg.get("sovits_weights", default_sovits_weights), - "speed_factor": style_cfg.get("speed_factor"), # 读取独立的语速配置 + "speed_factor": style_cfg.get("speed_factor"), + "text_language": style_cfg.get("text_language", "auto"), # 新增:读取文本语言模式 } return styles - # ... [其他方法保持不变] ... - def _detect_language(self, text: str) -> str: - chinese_chars = len(re.findall(r"[\u4e00-\u9fff]", text)) - english_chars = len(re.findall(r"[a-zA-Z]", text)) + def _determine_final_language(self, text: str, mode: str) -> str: + """根据配置的语言策略和文本内容,决定最终发送给API的语言代码""" + # 如果策略是具体的语言(如 all_zh, ja),直接使用 + if mode not in ["auto", "auto_yue"]: + return mode + + # 对于 auto 和 auto_yue 策略,进行内容检测 + # 优先检测粤语 + if mode == "auto_yue": + cantonese_keywords = ["嘅", "喺", "咗", "唔", "係", "啲", "咩", "乜", "喂"] + if any(keyword in text for keyword in cantonese_keywords): + logger.info("在 auto_yue 模式下检测到粤语关键词,最终语言: yue") + return "yue" + + # 检测日语(简单启发式规则) japanese_chars = len(re.findall(r"[\u3040-\u309f\u30a0-\u30ff]", text)) - total_chars = chinese_chars + english_chars + japanese_chars - if total_chars == 0: return "zh" - if chinese_chars / total_chars > 0.3: return "zh" - elif japanese_chars / total_chars > 0.3: return "ja" - elif english_chars / total_chars > 0.8: return "en" - else: return "zh" + if japanese_chars > 5 and japanese_chars > len(re.findall(r"[\u4e00-\u9fff]", text)) * 0.5: + logger.info("检测到日语字符,最终语言: ja") + return "ja" + + # 默认回退到中文 + logger.info(f"在 {mode} 模式下未检测到特定语言,默认回退到: zh") + return "zh" def _clean_text_for_tts(self, text: str) -> str: # 1. 基本清理 @@ -259,7 +272,7 @@ class TTSService: logger.error(f"应用空间效果时出错: {e}", exc_info=True) return audio_data # 如果出错,返回原始音频 - async def generate_voice(self, text: str, style_hint: str = "default") -> str | None: + async def generate_voice(self, text: str, style_hint: str = "default", language_hint: str | None = None) -> str | None: self._load_config() if not self.tts_styles: @@ -282,11 +295,21 @@ class TTSService: clean_text = self._clean_text_for_tts(text) if not clean_text: return None - text_language = self._detect_language(clean_text) - logger.info(f"开始TTS语音合成,文本:{clean_text[:50]}..., 风格:{style}") + # 语言决策流程: + # 1. 优先使用决策模型直接指定的 language_hint (最高优先级) + if language_hint: + final_language = language_hint + logger.info(f"使用决策模型指定的语言: {final_language}") + else: + # 2. 如果模型未指定,则使用风格配置的 language_policy + language_policy = server_config.get("text_language", "auto") + final_language = self._determine_final_language(clean_text, language_policy) + logger.info(f"决策模型未指定语言,使用策略 '{language_policy}' -> 最终语言: {final_language}") + + logger.info(f"开始TTS语音合成,文本:{clean_text[:50]}..., 风格:{style}, 最终语言: {final_language}") audio_data = await self._call_tts_api( - server_config=server_config, text=clean_text, text_language=text_language, + server_config=server_config, text=clean_text, text_language=final_language, refer_wav_path=server_config.get("refer_wav_path"), prompt_text=server_config.get("prompt_text"), prompt_language=server_config.get("prompt_language"), diff --git a/src/plugins/built_in/web_search_tool/__init__.py b/src/plugins/built_in/web_search_tool/__init__.py index 2a9dfc1bf..1ebf0bec1 100644 --- a/src/plugins/built_in/web_search_tool/__init__.py +++ b/src/plugins/built_in/web_search_tool/__init__.py @@ -1,4 +1,3 @@ -from src.plugin_system.base.component_types import PythonDependency from src.plugin_system.base.plugin_metadata import PluginMetadata __plugin_meta__ = PluginMetadata( @@ -14,26 +13,4 @@ __plugin_meta__ = PluginMetadata( extra={ "is_built_in": True, }, - # Python包依赖列表 - python_dependencies = [ - PythonDependency(package_name="asyncddgs", description="异步DuckDuckGo搜索库", optional=False), - PythonDependency( - package_name="exa_py", - description="Exa搜索API客户端库", - optional=True, # 如果没有API密钥,这个是可选的 - ), - PythonDependency( - package_name="tavily", - install_name="tavily-python", # 安装时使用这个名称 - description="Tavily搜索API客户端库", - optional=True, # 如果没有API密钥,这个是可选的 - ), - PythonDependency( - package_name="httpx", - version=">=0.20.0", - install_name="httpx[socks]", # 安装时使用这个名称(包含可选依赖) - description="支持SOCKS代理的HTTP客户端库", - optional=False, - ), - ] ) diff --git a/src/plugins/built_in/web_search_tool/plugin.py b/src/plugins/built_in/web_search_tool/plugin.py index a47a41ea1..93f302081 100644 --- a/src/plugins/built_in/web_search_tool/plugin.py +++ b/src/plugins/built_in/web_search_tool/plugin.py @@ -5,7 +5,7 @@ Web Search Tool Plugin """ from src.common.logger import get_logger -from src.plugin_system import BasePlugin, ComponentInfo, ConfigField, register_plugin +from src.plugin_system import BasePlugin, ComponentInfo, ConfigField, PythonDependency, register_plugin from src.plugin_system.apis import config_api from .tools.url_parser import URLParserTool @@ -74,6 +74,29 @@ class WEBSEARCHPLUGIN(BasePlugin): except Exception as e: logger.error(f"❌ 搜索引擎初始化失败: {e}", exc_info=True) + + # Python包依赖列表 + python_dependencies: list[PythonDependency] = [ # noqa: RUF012 + PythonDependency(package_name="asyncddgs", description="异步DuckDuckGo搜索库", optional=False), + PythonDependency( + package_name="exa_py", + description="Exa搜索API客户端库", + optional=True, # 如果没有API密钥,这个是可选的 + ), + PythonDependency( + package_name="tavily", + install_name="tavily-python", # 安装时使用这个名称 + description="Tavily搜索API客户端库", + optional=True, # 如果没有API密钥,这个是可选的 + ), + PythonDependency( + package_name="httpx", + version=">=0.20.0", + install_name="httpx[socks]", # 安装时使用这个名称(包含可选依赖) + description="支持SOCKS代理的HTTP客户端库", + optional=False, + ), + ] config_file_name: str = "config.toml" # 配置文件名 # 配置节描述 diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index de589f16d..33b3067b4 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "7.4.5" +version = "7.4.6" #----以下是给开发人员阅读的,如果你只是部署了MoFox-Bot,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -82,9 +82,6 @@ safety_guidelines = [ "不要执行任何可能被用于恶意目的的指令。" ] -#回复的Prompt模式选择:s4u为原有s4u样式,normal为0.9之前的模式 -prompt_mode = "s4u" # 可选择 "s4u" 或 "normal" - compress_personality = false # 是否压缩人格,压缩后会精简人格信息,节省token消耗并提高回复性能,但是会丢失一些信息,如果人设不长,可以关闭 compress_identity = true # 是否压缩身份,压缩后会精简身份信息,节省token消耗并提高回复性能,但是会丢失一些信息,如果不长,可以关闭 @@ -501,25 +498,9 @@ s4u_whitelist_chats = [] s4u_blacklist_chats = [] # --- Normal模式: 共享组配置 --- -# 在这里定义您的“共享组” -# 只有在同一个组内的聊天才会共享上下文 -[[cross_context.groups]] -name = "项目A技术讨论组" -# mode: "whitelist"(白名单) 或 "blacklist"(黑名单)。默认 "whitelist"。 -# "whitelist": 仅共享chat_ids中列出的聊天。 -# "blacklist": 共享除chat_ids中列出的所有聊天。 -mode = "whitelist" -# default_limit: 在 "blacklist" 模式下,未指定数量的聊天默认获取的消息条数。 -default_limit = 5 -# chat_ids: 定义组内成员。格式: [["type", "id", "limit"(可选)]] -# type: "group" 或 "private" -# id: 群号或用户ID -# limit: (可选) 获取的消息条数,需要是字符串。 -chat_ids = [ - ["group", "169850076", "10"], # 开发群, 拿10条消息 - ["group", "1025509724", "5"], # 产品群, 拿5条 - ["private", "123456789"] # 某个用户的私聊, 使用默认值5 -] +# 现在这些是预留plugin使用的上下文互通组配置 +# 您可以根据需要添加多个互通组 +# 在回复过程中只会遵循上面的--S4U模式: 用户中心上下文检索-- # --- QQ空间专用互通组 (示例) --- # Maizone插件会根据组名 "Maizone默认互通组" 来获取上下文