This commit is contained in:
Windpicker-owo
2025-10-25 14:02:28 +08:00
34 changed files with 971 additions and 1156 deletions

View File

@@ -1,7 +1,9 @@
import logging
import random
from typing import Any
from src.common.logger import get_logger
# 修正导入路径让Pylance不再抱怨
from src.plugin_system import (
BaseAction,
BaseEventHandler,
@@ -17,9 +19,9 @@ from src.plugin_system import (
ToolParamType,
register_plugin,
)
from src.plugin_system.base.component_types import InjectionRule,InjectionType
from src.plugin_system.base.base_event import HandlerResult
logger = get_logger("hello_world_plugin")
class StartupMessageHandler(BaseEventHandler):
"""启动时打印消息的事件处理器。"""
@@ -29,7 +31,7 @@ class StartupMessageHandler(BaseEventHandler):
init_subscribe = [EventType.ON_START]
async def execute(self, params: dict) -> HandlerResult:
logging.info("🎉 Hello World 插件已启动,准备就绪!")
logger.info("🎉 Hello World 插件已启动,准备就绪!")
return HandlerResult(success=True, continue_process=True)
@@ -186,7 +188,7 @@ class WeatherPrompt(BasePrompt):
prompt_name = "weather_info_prompt"
prompt_description = "向Planner注入当前天气信息以丰富对话上下文。"
injection_rules = [InjectionRule(target_prompt="planner_prompt", injection_type=InjectionType.REPLACE, target_content="## 可用动作列表")]
injection_point = "planner_prompt"
async def execute(self) -> str:
# 在实际应用中这里可以调用天气API

View File

@@ -659,41 +659,6 @@ class ChatBot:
group_name = getattr(group_info, "group_name", None)
group_platform = getattr(group_info, "platform", None)
# 准备 additional_config将 format_info 嵌入其中
additional_config_str = None
try:
import orjson
additional_config_data = {}
# 首先获取adapter传递的additional_config
if hasattr(message_info, 'additional_config') and message_info.additional_config:
if isinstance(message_info.additional_config, dict):
additional_config_data = message_info.additional_config.copy()
elif isinstance(message_info.additional_config, str):
try:
additional_config_data = orjson.loads(message_info.additional_config)
except Exception as e:
logger.warning(f"无法解析 additional_config JSON: {e}")
additional_config_data = {}
# 然后添加format_info到additional_config中
if hasattr(message_info, 'format_info') and message_info.format_info:
try:
format_info_dict = message_info.format_info.to_dict()
additional_config_data["format_info"] = format_info_dict
logger.debug(f"[bot.py] 嵌入 format_info 到 additional_config: {format_info_dict}")
except Exception as e:
logger.warning(f"将 format_info 转换为字典失败: {e}")
else:
logger.warning(f"[bot.py] [问题] 消息缺少 format_info: message_id={message_id}")
# 序列化为JSON字符串
if additional_config_data:
additional_config_str = orjson.dumps(additional_config_data).decode("utf-8")
except Exception as e:
logger.error(f"准备 additional_config 失败: {e}")
# 创建数据库消息对象
db_message = DatabaseMessages(
message_id=message_id,
@@ -709,7 +674,6 @@ class ChatBot:
is_notify=bool(message.is_notify),
is_public_notice=bool(message.is_public_notice),
notice_type=message.notice_type,
additional_config=additional_config_str,
user_id=user_id,
user_nickname=user_nickname,
user_cardname=user_cardname,

View File

@@ -213,8 +213,8 @@ class ChatStream:
priority_info=json.dumps(getattr(message, "priority_info", None))
if getattr(message, "priority_info", None)
else None,
# 额外配置 - 需要将 format_info 嵌入到 additional_config 中
additional_config=self._prepare_additional_config(message_info),
# 额外配置
additional_config=getattr(message_info, "additional_config", None),
# 用户信息
user_id=str(getattr(user_info, "user_id", "")),
user_nickname=getattr(user_info, "user_nickname", ""),
@@ -253,59 +253,8 @@ class ChatStream:
f"interest_value: {db_message.interest_value}"
)
def _prepare_additional_config(self, message_info) -> str | None:
"""
准备 additional_config将 format_info 嵌入其中
这个方法模仿 storage.py 中的逻辑,确保 DatabaseMessages 中的 additional_config
包含 format_info使得 action_modifier 能够正确获取适配器支持的消息类型
Args:
message_info: BaseMessageInfo 对象
Returns:
str | None: JSON 字符串格式的 additional_config如果为空则返回 None
"""
import orjson
# 首先获取adapter传递的additional_config
additional_config_data = {}
if hasattr(message_info, 'additional_config') and message_info.additional_config:
if isinstance(message_info.additional_config, dict):
additional_config_data = message_info.additional_config.copy()
elif isinstance(message_info.additional_config, str):
# 如果是字符串,尝试解析
try:
additional_config_data = orjson.loads(message_info.additional_config)
except Exception as e:
logger.warning(f"无法解析 additional_config JSON: {e}")
additional_config_data = {}
# 然后添加format_info到additional_config中
if hasattr(message_info, 'format_info') and message_info.format_info:
try:
format_info_dict = message_info.format_info.to_dict()
additional_config_data["format_info"] = format_info_dict
logger.debug(f"嵌入 format_info 到 additional_config: {format_info_dict}")
except Exception as e:
logger.warning(f"将 format_info 转换为字典失败: {e}")
else:
logger.warning(f"[问题] 消息缺少 format_info: message_id={getattr(message_info, 'message_id', 'unknown')}")
logger.warning("[问题] 这可能导致 Action 无法正确检查适配器支持的类型")
# 序列化为JSON字符串
if additional_config_data:
try:
return orjson.dumps(additional_config_data).decode("utf-8")
except Exception as e:
logger.error(f"序列化 additional_config 失败: {e}")
return None
return None
def _safe_get_actions(self, message: "MessageRecv") -> list | None:
"""安全获取消息的actions字段"""
import json
try:
actions = getattr(message, "actions", None)
if actions is None:
@@ -314,6 +263,8 @@ class ChatStream:
# 如果是字符串尝试解析为JSON
if isinstance(actions, str):
try:
import json
actions = json.loads(actions)
except json.JSONDecodeError:
logger.warning(f"无法解析actions JSON字符串: {actions}")

View File

@@ -230,7 +230,7 @@ class OptimizedChatStream:
priority_info=json.dumps(getattr(message, "priority_info", None))
if getattr(message, "priority_info", None)
else None,
additional_config=self._prepare_additional_config(message_info),
additional_config=getattr(message_info, "additional_config", None),
user_id=str(getattr(user_info, "user_id", "")),
user_nickname=getattr(user_info, "user_nickname", ""),
user_cardname=getattr(user_info, "user_cardname", None),
@@ -342,59 +342,8 @@ class OptimizedChatStream:
return instance
def _prepare_additional_config(self, message_info) -> str | None:
"""
准备 additional_config将 format_info 嵌入其中
这个方法模仿 storage.py 中的逻辑,确保 DatabaseMessages 中的 additional_config
包含 format_info使得 action_modifier 能够正确获取适配器支持的消息类型
Args:
message_info: BaseMessageInfo 对象
Returns:
str | None: JSON 字符串格式的 additional_config如果为空则返回 None
"""
import orjson
# 首先获取adapter传递的additional_config
additional_config_data = {}
if hasattr(message_info, 'additional_config') and message_info.additional_config:
if isinstance(message_info.additional_config, dict):
additional_config_data = message_info.additional_config.copy()
elif isinstance(message_info.additional_config, str):
# 如果是字符串,尝试解析
try:
additional_config_data = orjson.loads(message_info.additional_config)
except Exception as e:
logger.warning(f"无法解析 additional_config JSON: {e}")
additional_config_data = {}
# 然后添加format_info到additional_config中
if hasattr(message_info, 'format_info') and message_info.format_info:
try:
format_info_dict = message_info.format_info.to_dict()
additional_config_data["format_info"] = format_info_dict
logger.debug(f"嵌入 format_info 到 additional_config: {format_info_dict}")
except Exception as e:
logger.warning(f"将 format_info 转换为字典失败: {e}")
else:
logger.warning(f"[问题] 消息缺少 format_info: message_id={getattr(message_info, 'message_id', 'unknown')}")
logger.warning("[问题] 这可能导致 Action 无法正确检查适配器支持的类型")
# 序列化为JSON字符串
if additional_config_data:
try:
return orjson.dumps(additional_config_data).decode("utf-8")
except Exception as e:
logger.error(f"序列化 additional_config 失败: {e}")
return None
return None
def _safe_get_actions(self, message: "MessageRecv") -> list | None:
"""安全获取消息的actions字段"""
import json
try:
actions = getattr(message, "actions", None)
if actions is None:
@@ -402,6 +351,8 @@ class OptimizedChatStream:
if isinstance(actions, str):
try:
import json
actions = json.loads(actions)
except json.JSONDecodeError:
logger.warning(f"无法解析actions JSON字符串: {actions}")

View File

@@ -99,27 +99,6 @@ class MessageStorage:
# 将priority_info字典序列化为JSON字符串以便存储到数据库的Text字段
priority_info_json = orjson.dumps(priority_info).decode("utf-8") if priority_info else None
# 准备additional_config包含format_info和其他配置
additional_config_data = None
# 首先获取adapter传递的additional_config
if hasattr(message.message_info, 'additional_config') and message.message_info.additional_config:
additional_config_data = message.message_info.additional_config.copy() # 避免修改原始对象
else:
additional_config_data = {}
# 然后添加format_info到additional_config中
if hasattr(message.message_info, 'format_info') and message.message_info.format_info:
format_info_dict = message.message_info.format_info.to_dict()
additional_config_data["format_info"] = format_info_dict
logger.debug(f"保存format_info: {format_info_dict}")
else:
logger.warning(f"[问题] 消息缺少format_info: message_id={getattr(message.message_info, 'message_id', 'unknown')}")
logger.warning("[问题] 这可能导致Action无法正确检查适配器支持的类型")
# 序列化为JSON字符串以便存储
additional_config_json = orjson.dumps(additional_config_data).decode("utf-8") if additional_config_data else None
# 获取数据库会话
new_message = Messages(
@@ -155,7 +134,6 @@ class MessageStorage:
is_command=is_command,
key_words=key_words,
key_words_lite=key_words_lite,
additional_config=additional_config_json,
)
async with get_db_session() as session:
session.add(new_message)

View File

@@ -165,6 +165,7 @@ class ChatterActionManager:
执行结果
"""
chat_stream = None
try:
logger.debug(f"🎯 [ActionManager] execute_action接收到 target_message: {target_message}")
# 通过chat_id获取chat_stream
@@ -180,6 +181,9 @@ class ChatterActionManager:
"error": "chat_stream not found",
}
# 设置正在回复的状态
chat_stream.context_manager.context.is_replying = True
if action_name == "no_action":
return {"action_type": "no_action", "success": True, "reply_text": "", "command": ""}
@@ -205,7 +209,7 @@ class ChatterActionManager:
action_build_into_prompt=False,
action_prompt_display=reason,
action_done=True,
thinking_id=thinking_id,
thinking_id=thinking_id or "",
action_data={"reason": reason},
action_name="no_reply",
)
@@ -298,6 +302,10 @@ class ChatterActionManager:
"loop_info": None,
"error": str(e),
}
finally:
# 确保重置正在回复的状态
if chat_stream:
chat_stream.context_manager.context.is_replying = False
async def _record_action_to_message(self, chat_stream, action_name, target_message, action_data):
"""

View File

@@ -4,8 +4,6 @@ import random
import time
from typing import TYPE_CHECKING, Any
import orjson
from src.chat.message_receive.chat_stream import get_chat_manager
from src.chat.planner_actions.action_manager import ChatterActionManager
from src.chat.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
@@ -184,133 +182,13 @@ class ActionModifier:
def _check_action_associated_types(self, all_actions: dict[str, ActionInfo], chat_context: StreamContext):
type_mismatched_actions: list[tuple[str, str]] = []
for action_name, action_info in all_actions.items():
if action_info.associated_types and not self._check_action_output_types(action_info.associated_types, chat_context):
if action_info.associated_types and not chat_context.check_types(action_info.associated_types):
associated_types_str = ", ".join(action_info.associated_types)
reason = f"适配器不支持(需要: {associated_types_str}"
type_mismatched_actions.append((action_name, reason))
logger.debug(f"{self.log_prefix}决定移除动作: {action_name},原因: {reason}")
return type_mismatched_actions
def _check_action_output_types(self, output_types: list[str], chat_context: StreamContext) -> bool:
"""
检查Action的输出类型是否被当前适配器支持
Args:
output_types: Action需要输出的消息类型列表
chat_context: 聊天上下文
Returns:
bool: 如果所有输出类型都支持则返回True
"""
# 获取当前适配器支持的输出类型
adapter_supported_types = self._get_adapter_supported_output_types(chat_context)
# 检查所有需要的输出类型是否都被支持
for output_type in output_types:
if output_type not in adapter_supported_types:
logger.debug(f"适配器不支持输出类型 '{output_type}',支持的类型: {adapter_supported_types}")
return False
return True
def _get_adapter_supported_output_types(self, chat_context: StreamContext) -> list[str]:
"""
获取当前适配器支持的输出类型列表
Args:
chat_context: 聊天上下文
Returns:
list[str]: 支持的输出类型列表
"""
# 检查additional_config是否存在且不为空
additional_config = None
has_additional_config = False
# 先检查 current_message 是否存在
if not chat_context.current_message:
logger.warning(f"{self.log_prefix} [问题] chat_context.current_message 为 None无法获取适配器支持的类型")
return ["text", "emoji"] # 返回基础类型
if hasattr(chat_context.current_message, "additional_config"):
additional_config = chat_context.current_message.additional_config
# 更准确的非空判断
if additional_config is not None:
if isinstance(additional_config, str) and additional_config.strip():
has_additional_config = True
elif isinstance(additional_config, dict):
# 字典存在就可以即使为空也可能有format_info字段
has_additional_config = True
else:
logger.warning(f"{self.log_prefix} [问题] current_message 没有 additional_config 属性")
logger.debug(f"{self.log_prefix} [调试] has_additional_config: {has_additional_config}")
if has_additional_config:
try:
logger.debug(f"{self.log_prefix} [调试] 开始解析 additional_config")
format_info = None
# 处理additional_config可能是字符串或字典的情况
if isinstance(additional_config, str):
# 如果是字符串尝试解析为JSON
try:
config = orjson.loads(additional_config)
format_info = config.get("format_info")
except (orjson.JSONDecodeError, AttributeError, TypeError) as e:
format_info = None
elif isinstance(additional_config, dict):
# 如果是字典直接获取format_info
format_info = additional_config.get("format_info")
# 如果找到了format_info从中提取支持的类型
if format_info:
if "accept_format" in format_info:
accept_format = format_info["accept_format"]
if isinstance(accept_format, str):
accept_format = [accept_format]
elif isinstance(accept_format, list):
pass
else:
accept_format = list(accept_format) if hasattr(accept_format, "__iter__") else []
# 合并基础类型和适配器特定类型
result = list(set(accept_format))
return result
# 备用检查content_format字段
elif "content_format" in format_info:
content_format = format_info["content_format"]
logger.debug(f"{self.log_prefix} [调试] 找到 content_format: {content_format}")
if isinstance(content_format, str):
content_format = [content_format]
elif isinstance(content_format, list):
pass
else:
content_format = list(content_format) if hasattr(content_format, "__iter__") else []
result = list(set(content_format))
return result
else:
logger.warning(f"{self.log_prefix} [问题] additional_config 中没有 format_info 字段")
except Exception as e:
logger.error(f"{self.log_prefix} [问题] 解析适配器格式信息失败: {e}", exc_info=True)
else:
logger.warning(f"{self.log_prefix} [问题] additional_config 不存在或为空")
# 如果无法获取格式信息,返回默认支持的基础类型
default_types = ["text", "emoji"]
logger.warning(
f"{self.log_prefix} [问题] 无法从适配器获取支持的消息类型,使用默认类型: {default_types}"
)
logger.warning(
f"{self.log_prefix} [问题] 这可能导致某些 Action 被错误地过滤。"
f"请检查适配器是否正确设置了 format_info。"
)
return default_types
async def _get_deactivated_actions_by_type(
self,
actions_with_info: dict[str, ActionInfo],

View File

@@ -162,62 +162,6 @@ If you need to use the search tool, please directly call the function "lpmm_sear
name="lpmm_get_knowledge_prompt",
)
# normal 版 prompt 模板0.9之前的简化模式)
logger.debug("[Prompt模式调试] 正在注册normal_style_prompt模板")
Prompt(
"""
{chat_scene}
**重要:消息针对性判断**
在回应之前,首先分析消息的针对性:
1. **直接针对你**@你、回复你、明确询问你 → 必须回应
2. **间接相关**:涉及你感兴趣的话题但未直接问你 → 谨慎参与
3. **他人对话**:与你无关的私人交流 → 通常不参与
4. **重复内容**:他人已充分回答的问题 → 避免重复
{expression_habits_block}
{tool_info_block}
{knowledge_prompt}
{memory_block}
{relation_info_block}
{extra_info_block}
{notice_block}
{cross_context_block}
{identity}
如果有人说你是人机,你可以用一种阴阳怪气的口吻来回应
{schedule_block}
{action_descriptions}
下面是群里最近的聊天内容:
--------------------------------
{time_block}
{chat_info}
--------------------------------
{reply_target_block}
你现在的心情是:{mood_state}
{config_expression_style}
注意不要复读你前面发过的内容,意思相近也不行。
{keywords_reaction_prompt}
请注意不要输出多余内容(包括前后缀冒号和引号at或 @等 )。只输出回复内容。
{moderation_prompt}
你的核心任务是针对 {reply_target_block} 中提到的内容,{relation_info_block}生成一段紧密相关且能推动对话的回复。你的回复应该:
1. 明确回应目标消息,而不是宽泛地评论。
2. 可以分享你的看法、提出相关问题,或者开个合适的玩笑。
3. 目的是让对话更有趣、更深入。
最终请输出一条简短、完整且口语化的回复。
*你叫{bot_name},也有人叫你{bot_nickname}*
现在,你说:
""",
"normal_style_prompt",
)
logger.debug("[Prompt模式调试] normal_style_prompt模板注册完成")
class DefaultReplyer:
@@ -1329,7 +1273,7 @@ class DefaultReplyer:
),
"cross_context": asyncio.create_task(
self._time_and_run_task(
Prompt.build_cross_context(chat_id, global_config.personality.prompt_mode, target_user_info),
Prompt.build_cross_context(chat_id, "s4u", target_user_info),
"cross_context",
)
),
@@ -1503,9 +1447,6 @@ class DefaultReplyer:
else:
reply_target_block = ""
# 根据配置选择模板
current_prompt_mode = global_config.personality.prompt_mode
# 动态生成聊天场景提示
if is_group_chat:
chat_scene_prompt = "你正在一个QQ群里聊天你需要理解整个群的聊天动态和话题走向并做出自然的回应。"
@@ -1524,7 +1465,7 @@ class DefaultReplyer:
available_actions=available_actions,
enable_tool=enable_tool,
chat_target_info=self.chat_target_info,
prompt_mode=current_prompt_mode,
prompt_mode="s4u",
message_list_before_now_long=message_list_before_now_long,
message_list_before_short=message_list_before_short,
chat_talking_prompt_short=chat_talking_prompt_short,
@@ -1552,13 +1493,7 @@ class DefaultReplyer:
)
# 使用新的统一Prompt系统 - 使用正确的模板名称
template_name = ""
if current_prompt_mode == "s4u":
template_name = "s4u_style_prompt"
elif current_prompt_mode == "normal":
template_name = "normal_style_prompt"
elif current_prompt_mode == "minimal":
template_name = "default_expressor_prompt"
template_name = "s4u_style_prompt"
# 获取模板内容
template_prompt = await global_prompt_manager.get_prompt_async(template_name)

View File

@@ -130,19 +130,15 @@ class PromptManager:
# 确保我们有有效的parameters实例
params_for_injection = parameters or original_prompt.parameters
# 应用所有匹配的注入规则,获取修改后的模板
modified_template = await prompt_component_manager.apply_injections(
target_prompt_name=original_prompt.name,
original_template=original_prompt.template,
params=params_for_injection,
components_prefix = await prompt_component_manager.execute_components_for(
injection_point=original_prompt.name, params=params_for_injection
)
# 如果模板被修改了就创建一个新的临时Prompt实例
if modified_template != original_prompt.template:
logger.info(f"'{name}'应用了Prompt注入规则")
if components_prefix:
logger.info(f"'{name}'注入插件内容: \n{components_prefix}")
# 创建一个新的临时Prompt实例不进行注册
new_template = f"{components_prefix}\n\n{original_prompt.template}"
temp_prompt = Prompt(
template=modified_template,
template=new_template,
name=original_prompt.name,
parameters=original_prompt.parameters,
should_register=False, # 确保不重新注册
@@ -400,8 +396,6 @@ class Prompt:
# 构建聊天历史
if self.parameters.prompt_mode == "s4u":
await self._build_s4u_chat_context(context_data)
else:
await self._build_normal_chat_context(context_data)
# 补充基础信息
context_data.update(
@@ -444,13 +438,6 @@ class Prompt:
context_data["read_history_prompt"] = read_history_prompt
context_data["unread_history_prompt"] = unread_history_prompt
async def _build_normal_chat_context(self, context_data: dict[str, Any]) -> None:
"""构建normal模式的聊天上下文"""
if not self.parameters.chat_talking_prompt_short:
return
context_data["chat_info"] = f"""群里的聊天内容:
{self.parameters.chat_talking_prompt_short}"""
async def _build_s4u_chat_history_prompts(
self, message_list_before_now: list[dict[str, Any]], target_user_id: str, sender: str, chat_id: str
@@ -790,8 +777,6 @@ class Prompt:
"""使用上下文数据格式化模板"""
if self.parameters.prompt_mode == "s4u":
params = self._prepare_s4u_params(context_data)
elif self.parameters.prompt_mode == "normal":
params = self._prepare_normal_params(context_data)
else:
params = self._prepare_default_params(context_data)
@@ -827,34 +812,6 @@ class Prompt:
or "你正在一个QQ群里聊天你需要理解整个群的聊天动态和话题走向并做出自然的回应。",
}
def _prepare_normal_params(self, context_data: dict[str, Any]) -> dict[str, Any]:
"""准备Normal模式的参数"""
return {
**context_data,
"expression_habits_block": context_data.get("expression_habits_block", ""),
"tool_info_block": context_data.get("tool_info_block", ""),
"knowledge_prompt": context_data.get("knowledge_prompt", ""),
"memory_block": context_data.get("memory_block", ""),
"relation_info_block": context_data.get("relation_info_block", ""),
"extra_info_block": self.parameters.extra_info_block or context_data.get("extra_info_block", ""),
"cross_context_block": context_data.get("cross_context_block", ""),
"notice_block": self.parameters.notice_block or context_data.get("notice_block", ""),
"identity": self.parameters.identity_block or context_data.get("identity", ""),
"action_descriptions": self.parameters.action_descriptions or context_data.get("action_descriptions", ""),
"schedule_block": self.parameters.schedule_block or context_data.get("schedule_block", ""),
"time_block": context_data.get("time_block", ""),
"chat_info": context_data.get("chat_info", ""),
"reply_target_block": context_data.get("reply_target_block", ""),
"config_expression_style": global_config.personality.reply_style,
"mood_state": self.parameters.mood_prompt or context_data.get("mood_state", ""),
"keywords_reaction_prompt": self.parameters.keywords_reaction_prompt
or context_data.get("keywords_reaction_prompt", ""),
"moderation_prompt": self.parameters.moderation_prompt_block or context_data.get("moderation_prompt", ""),
"safety_guidelines_block": self.parameters.safety_guidelines_block
or context_data.get("safety_guidelines_block", ""),
"chat_scene": self.parameters.chat_scene
or "你正在一个QQ群里聊天你需要理解整个群的聊天动态和话题走向并做出自然的回应。",
}
def _prepare_default_params(self, context_data: dict[str, Any]) -> dict[str, Any]:
"""准备默认模式的参数"""
@@ -1025,12 +982,7 @@ class Prompt:
if not chat_stream:
return ""
if prompt_mode == "normal":
context_group = await cross_context_api.get_context_group(chat_id)
if not context_group:
return ""
return await cross_context_api.build_cross_context_normal(chat_stream, context_group)
elif prompt_mode == "s4u":
if prompt_mode == "s4u":
return await cross_context_api.build_cross_context_s4u(chat_stream, target_user_info)
return ""
@@ -1083,12 +1035,12 @@ async def create_prompt_async(
# 动态注入插件内容
if name:
modified_template = await prompt_component_manager.apply_injections(
target_prompt_name=name, original_template=template, params=final_params
components_prefix = await prompt_component_manager.execute_components_for(
injection_point=name, params=final_params
)
if modified_template != template:
logger.debug(f"'{name}'应用了Prompt注入规则")
template = modified_template
if components_prefix:
logger.debug(f"'{name}'注入插件内容: \n{components_prefix}")
template = f"{components_prefix}\n\n{template}"
# 使用可能已修改的模板创建实例
prompt = create_prompt(template, name, final_params)

View File

@@ -1,11 +1,9 @@
import asyncio
import re
from typing import Type
from src.chat.utils.prompt_params import PromptParameters
from src.common.logger import get_logger
from src.plugin_system.base.base_prompt import BasePrompt
from src.plugin_system.base.component_types import ComponentType, InjectionRule, InjectionType, PromptInfo
from src.plugin_system.base.component_types import ComponentType, PromptInfo
from src.plugin_system.core.component_registry import component_registry
logger = get_logger("prompt_component_manager")
@@ -21,143 +19,89 @@ class PromptComponentManager:
3. 提供一个接口以便在构建核心Prompt时能够获取并执行所有相关的组件。
"""
def _get_rules_for(self, target_prompt_name: str) -> list[tuple[InjectionRule, Type[BasePrompt]]]:
def get_components_for(self, injection_point: str) -> list[type[BasePrompt]]:
"""
获取指定目标Prompt的所有注入规则及其关联的组件类。
获取指定注入点的所有已注册组件类。
Args:
target_prompt_name (str): 目标 Prompt 的名称。
injection_point: 目标Prompt的名称。
Returns:
list[tuple[InjectionRule, Type[BasePrompt]]]: 一个元组列表
每个元组包含一个注入规则和其对应的 Prompt 组件类,并已根据优先级排序。
list[Type[BasePrompt]]: 与该注入点关联的组件类列表
"""
# 从注册表中获取所有启用的 PROMPT 类型的组件
# 从组件注册中心获取所有启用的Prompt组件
enabled_prompts = component_registry.get_enabled_components_by_type(ComponentType.PROMPT)
matching_rules = []
# 遍历所有启用的 Prompt 组件,查找与目标 Prompt 相关的注入规则
matching_components: list[type[BasePrompt]] = []
for prompt_name, prompt_info in enabled_prompts.items():
# 确保 prompt_info 是 PromptInfo 类型
if not isinstance(prompt_info, PromptInfo):
continue
# prompt_info.injection_rules 已经经过了后向兼容处理,确保总是列表
for rule in prompt_info.injection_rules:
# 如果规则的目标是当前指定的 Prompt
if rule.target_prompt == target_prompt_name:
# 获取该规则对应的组件类
component_class = component_registry.get_component_class(prompt_name, ComponentType.PROMPT)
# 确保获取到的确实是一个 BasePrompt 的子类
if component_class and issubclass(component_class, BasePrompt):
matching_rules.append((rule, component_class))
# 获取注入点信息
injection_points = prompt_info.injection_point
if isinstance(injection_points, str):
injection_points = [injection_points]
# 根据规则的优先级进行排序,数字越小,优先级越高,越先应用
matching_rules.sort(key=lambda x: x[0].priority)
return matching_rules
# 检查当前注入点是否匹配
if injection_point in injection_points:
# 获取组件类
component_class = component_registry.get_component_class(prompt_name, ComponentType.PROMPT)
if component_class and issubclass(component_class, BasePrompt):
matching_components.append(component_class)
async def apply_injections(
self, target_prompt_name: str, original_template: str, params: PromptParameters
) -> str:
return matching_components
async def execute_components_for(self, injection_point: str, params: PromptParameters) -> str:
"""
获取、实例化并执行所有相关组件,然后根据注入规则修改原始模板
这是一个三步走的过程:
1. 实例化所有需要执行的组件。
2. 并行执行它们的 `execute` 方法以获取注入内容。
3. 按照优先级顺序,将内容注入到原始模板中。
实例化并执行指定注入点的所有组件,然后将它们的输出拼接成一个字符串
Args:
target_prompt_name (str): 目标 Prompt 的名称。
original_template (str): 原始的、未经修改的 Prompt 模板字符串
params (PromptParameters): 传递给 Prompt 组件实例的参数。
injection_point: 目标Prompt的名称。
params: 用于初始化组件的 PromptParameters 对象
Returns:
str: 应用了所有注入规则后,修改过的 Prompt 模板字符串
str: 所有相关组件生成的、用换行符连接的文本内容
"""
rules_with_classes = self._get_rules_for(target_prompt_name)
# 如果没有找到任何匹配的规则,就直接返回原始模板,啥也不干
if not rules_with_classes:
return original_template
# --- 第一步: 实例化所有需要执行的组件 ---
instance_map = {} # 存储组件实例,虽然目前没直接用,但留着总没错
tasks = [] # 存放所有需要并行执行的 execute 异步任务
components_to_execute = [] # 存放需要执行的组件类,用于后续结果映射
for rule, component_class in rules_with_classes:
# 如果注入类型是 REMOVE那就不需要执行组件了因为它不产生内容
if rule.injection_type != InjectionType.REMOVE:
try:
# 获取组件的元信息,主要是为了拿到插件名称来读取插件配置
prompt_info = component_registry.get_component_info(
component_class.prompt_name, ComponentType.PROMPT
)
if not isinstance(prompt_info, PromptInfo):
plugin_config = {}
else:
# 从注册表获取该组件所属插件的配置
plugin_config = component_registry.get_plugin_config(prompt_info.plugin_name)
# 实例化组件,并传入参数和插件配置
instance = component_class(params=params, plugin_config=plugin_config)
instance_map[component_class.prompt_name] = instance
# 将组件的 execute 方法作为一个任务添加到列表中
tasks.append(instance.execute())
components_to_execute.append(component_class)
except Exception as e:
logger.error(f"实例化 Prompt 组件 '{component_class.prompt_name}' 失败: {e}")
# 即使失败,也添加一个立即完成的空任务,以保持与其他任务的索引同步
tasks.append(asyncio.create_task(asyncio.sleep(0, result=e))) # type: ignore
# --- 第二步: 并行执行所有组件的 execute 方法 ---
# 使用 asyncio.gather 来同时运行所有任务,提高效率
results = await asyncio.gather(*tasks, return_exceptions=True)
# 创建一个从组件名到执行结果的映射,方便后续查找
result_map = {
components_to_execute[i].prompt_name: res
for i, res in enumerate(results)
if not isinstance(res, Exception) # 只包含成功的结果
}
# 单独处理并记录执行失败的组件
for i, res in enumerate(results):
if isinstance(res, Exception):
logger.error(f"执行 Prompt 组件 '{components_to_execute[i].prompt_name}' 失败: {res}")
# --- 第三步: 按优先级顺序应用注入规则 ---
modified_template = original_template
for rule, component_class in rules_with_classes:
# 从结果映射中获取该组件生成的内容
content = result_map.get(component_class.prompt_name)
component_classes = self.get_components_for(injection_point)
if not component_classes:
return ""
tasks = []
for component_class in component_classes:
try:
if rule.injection_type == InjectionType.PREPEND:
if content:
modified_template = f"{content}\n{modified_template}"
elif rule.injection_type == InjectionType.APPEND:
if content:
modified_template = f"{modified_template}\n{content}"
elif rule.injection_type == InjectionType.REPLACE:
# 使用正则表达式替换目标内容
if content and rule.target_content:
modified_template = re.sub(rule.target_content, str(content), modified_template)
elif rule.injection_type == InjectionType.INSERT_AFTER:
# 在匹配到的内容后面插入
if content and rule.target_content:
# re.sub a little trick: \g<0> represents the entire matched string
replacement = f"\\g<0>\n{content}"
modified_template = re.sub(rule.target_content, replacement, modified_template)
elif rule.injection_type == InjectionType.REMOVE:
# 使用正则表达式移除目标内容
if rule.target_content:
modified_template = re.sub(rule.target_content, "", modified_template)
except re.error as e:
logger.error(
f"在为 '{component_class.prompt_name}' 应用规则时发生正则错误: {e} (pattern: '{rule.target_content}')"
# 从注册中心获取组件信息
prompt_info = component_registry.get_component_info(
component_class.prompt_name, ComponentType.PROMPT
)
except Exception as e:
logger.error(f"应用 Prompt 注入规则 '{rule}' 失败: {e}")
if not isinstance(prompt_info, PromptInfo):
logger.warning(f"找不到 Prompt 组件 '{component_class.prompt_name}' 的信息,无法获取插件配置")
plugin_config = {}
else:
plugin_config = component_registry.get_plugin_config(prompt_info.plugin_name)
return modified_template
instance = component_class(params=params, plugin_config=plugin_config)
tasks.append(instance.execute())
except Exception as e:
logger.error(f"实例化 Prompt 组件 '{component_class.prompt_name}' 失败: {e}")
if not tasks:
return ""
# 并行执行所有组件
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤掉执行失败的结果和空字符串
valid_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"执行 Prompt 组件 '{component_classes[i].prompt_name}' 失败: {result}")
elif result and isinstance(result, str) and result.strip():
valid_results.append(result.strip())
# 使用换行符拼接所有有效结果
return "\n".join(valid_results)
# 创建全局单例

View File

@@ -64,7 +64,6 @@ class PersonalityConfig(ValidatedConfigBase):
default_factory=list, description="安全与互动底线Bot在任何情况下都必须遵守的原则"
)
reply_style: str = Field(default="", description="表达风格")
prompt_mode: Literal["s4u", "normal"] = Field(default="s4u", description="Prompt模式")
compress_personality: bool = Field(default=True, description="是否压缩人格")
compress_identity: bool = Field(default=True, description="是否压缩身份")
@@ -632,7 +631,6 @@ class CrossContextConfig(ValidatedConfigBase):
# --- Normal模式: 共享组配置 ---
groups: list[ContextGroup] = Field(default_factory=list, description="上下文共享组列表")
# --- S4U模式: 用户中心上下文检索 ---
s4u_mode: Literal["whitelist", "blacklist"] = Field(
default="whitelist",

View File

@@ -511,8 +511,10 @@ MoFox_Bot(第三方修改版)
logger.error(f"月度计划管理器初始化失败: {e}")
# 初始化日程管理器
if global_config.planning_system.schedule_enable:
try:
await schedule_manager.initialize()
await schedule_manager.load_or_generate_today_schedule()
await schedule_manager.start_daily_schedule_generation()
logger.info("日程表管理器初始化成功")
except Exception as e:
logger.error(f"日程表管理器初始化失败: {e}")

View File

@@ -1,180 +1,330 @@
"""
日程表与月度计划API模块
日程表与月度计划查询API模块
专门负责日程和月度计划信息的查询与管理采用标准Python包设计模式
所有对外接口均为异步函数,以便于插件开发者在异步环境中使用
本模块提供了一系列用于查询日程和月度计划的只读接口。
所有对外接口均为异步函数,专为插件开发者设计,以便在异步环境中无缝集成
核心功能:
- 查询指定日期的日程安排。
- 获取当前正在进行的活动。
- 筛选特定时间范围内的活动。
- 查询月度计划,支持随机抽样和计数。
- 所有查询接口均提供格式化输出选项。
使用方式:
import asyncio
from src.plugin_system.apis import schedule_api
async def main():
# 获取今日日程
today_schedule = await schedule_api.get_today_schedule()
# 获取今天的日程(原始数据)
today_schedule = await schedule_api.get_schedule()
if today_schedule:
print("今天的日程:", today_schedule)
# 获取昨天的日程,并格式化为字符串
from datetime import datetime, timedelta
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
formatted_schedule = await schedule_api.get_schedule(date=yesterday, formatted=True)
if formatted_schedule:
print(f"\\n昨天的日程 (格式化):\\n{formatted_schedule}")
# 获取当前活动
current_activity = await schedule_api.get_current_activity()
if current_activity:
print("当前活动:", current_activity)
print(f"\\n当前活动: {current_activity.get('activity')}")
# 获取本月月度计划
from datetime import datetime
this_month = datetime.now().strftime("%Y-%m")
plans = await schedule_api.get_monthly_plans(this_month)
if plans:
print(f"{this_month} 的月度计划:", [p.plan_text for p in plans])
# 获取本月月度计划总数
plan_count = await schedule_api.count_monthly_plans()
print(f"\\n本月月度计划总数: {plan_count}")
# 随机获取本月的2个计划
random_plans = await schedule_api.get_monthly_plans(random_count=2)
if random_plans:
print("\\n随机的2个计划:", [p.plan_text for p in random_plans])
asyncio.run(main())
"""
import random
from datetime import datetime
from typing import Any
from src.common.database.sqlalchemy_models import MonthlyPlan
import orjson
from sqlalchemy import func, select
from src.common.database.sqlalchemy_models import MonthlyPlan, Schedule, get_db_session
from src.common.logger import get_logger
from src.schedule.database import get_active_plans_for_month
from src.schedule.schedule_manager import schedule_manager
logger = get_logger("schedule_api")
# --- 内部辅助函数 ---
def _format_schedule_list(
items: list[dict[str, Any]] | list[MonthlyPlan],
template: str,
item_type: str,
) -> str:
"""将日程或计划列表格式化为字符串"""
if not items:
return ""
lines = []
for item in items:
if item_type == "schedule" and isinstance(item, dict):
lines.append(template.format(time_range=item.get("time_range", ""), activity=item.get("activity", "")))
elif item_type == "plan" and isinstance(item, MonthlyPlan):
lines.append(template.format(plan_text=item.plan_text))
return "\\n".join(lines)
async def _get_schedule_from_db(date_str: str) -> list[dict[str, Any]] | None:
"""从数据库中获取并解析指定日期的日程"""
async with get_db_session() as session:
result = await session.execute(select(Schedule).filter(Schedule.date == date_str))
schedule_record = result.scalars().first()
if schedule_record and schedule_record.schedule_data:
try:
return orjson.loads(str(schedule_record.schedule_data))
except orjson.JSONDecodeError:
logger.warning(f"无法解析数据库中的日程数据 (日期: {date_str})")
return None
# --- API实现 ---
class ScheduleAPI:
"""日程表与月度计划API - 负责日程和计划信息的查询与管理"""
"""日程表与月度计划查询API"""
@staticmethod
async def get_today_schedule() -> list[dict[str, Any]] | None:
"""(异步) 获取今天的日程安排
async def get_schedule(
date: str | None = None,
formatted: bool = False,
format_template: str = "{time_range}: {activity}",
) -> list[dict[str, Any]] | str | None:
"""
(异步) 获取指定日期的日程安排。
Args:
date (Optional[str]): 目标日期,格式 "YYYY-MM-DD"。如果为None则使用当前日期。
formatted (bool): 如果为True返回格式化的字符串否则返回原始数据列表。
format_template (str): 当 formatted=True 时使用的格式化模板。
Returns:
Optional[List[Dict[str, Any]]]: 今天的日程列表,如果未生成或未启用则返回None
Union[List[Dict[str, Any]], str, None]: 日程数据或None
"""
target_date = date or datetime.now().strftime("%Y-%m-%d")
try:
logger.debug("[ScheduleAPI] 正在获取今天的日程安排...")
return schedule_manager.today_schedule
logger.debug(f"[ScheduleAPI] 正在获取 {target_date} 的日程安排...")
schedule_data = await _get_schedule_from_db(target_date)
if schedule_data is None:
return None
if formatted:
return _format_schedule_list(schedule_data, format_template, "schedule")
return schedule_data
except Exception as e:
logger.error(f"[ScheduleAPI] 获取今日日程失败: {e}")
logger.error(f"[ScheduleAPI] 获取 {target_date} 日程失败: {e}")
return None
@staticmethod
async def get_current_activity() -> str | None:
"""(异步) 获取当前正在进行的活动
async def get_current_activity(
formatted: bool = False,
format_template: str = "{time_range}: {activity}",
) -> dict[str, Any] | str | None:
"""
(异步) 获取当前正在进行的活动。
Args:
formatted (bool): 如果为True返回格式化的字符串否则返回活动字典。
format_template (str): 当 formatted=True 时使用的格式化模板。
Returns:
Optional[str]: 当前活动名称,如果没有则返回None
Union[Dict[str, Any], str, None]: 当前活动数据或None
"""
try:
logger.debug("[ScheduleAPI] 正在获取当前活动...")
return schedule_manager.get_current_activity()
today_schedule = await _get_schedule_from_db(datetime.now().strftime("%Y-%m-%d"))
if not today_schedule:
return None
now = datetime.now().time()
for event in today_schedule:
time_range = event.get("time_range")
if not time_range:
continue
try:
start_str, end_str = time_range.split("-")
start_time = datetime.strptime(start_str.strip(), "%H:%M").time()
end_time = datetime.strptime(end_str.strip(), "%H:%M").time()
if (start_time <= now < end_time) or \
(end_time < start_time and (now >= start_time or now < end_time)):
if formatted:
return _format_schedule_list([event], format_template, "schedule")
return event
except (ValueError, KeyError):
continue
return None
except Exception as e:
logger.error(f"[ScheduleAPI] 获取当前活动失败: {e}")
return None
@staticmethod
async def regenerate_schedule() -> bool:
"""(异步) 触发后台重新生成今天的日程
Returns:
bool: 是否成功触发
async def get_activities_between(
start_time: str,
end_time: str,
date: str | None = None,
formatted: bool = False,
format_template: str = "{time_range}: {activity}",
) -> list[dict[str, Any]] | str | None:
"""
try:
logger.info("[ScheduleAPI] 正在触发后台重新生成日程...")
await schedule_manager.generate_and_save_schedule()
return True
except Exception as e:
logger.error(f"[ScheduleAPI] 触发日程重新生成失败: {e}")
return False
@staticmethod
async def get_monthly_plans(target_month: str | None = None) -> list[MonthlyPlan]:
"""(异步) 获取指定月份的有效月度计划
(异步) 获取指定日期和时间范围内的所有活动。
Args:
target_month (Optional[str]): 目标月份,格式 "YYYY-MM"如果为None则使用当前月份。
start_time (str): 开始时间,格式 "HH:MM"
end_time (str): 结束时间,格式 "HH:MM"
date (Optional[str]): 目标日期,格式 "YYYY-MM-DD"。如果为None则使用当前日期。
formatted (bool): 如果为True返回格式化的字符串否则返回活动列表。
format_template (str): 当 formatted=True 时使用的格式化模板。
Returns:
List[MonthlyPlan]: 月度计划对象列表
Union[List[Dict[str, Any]], str, None]: 在时间范围内的活动列表或None。
"""
if target_month is None:
target_month = datetime.now().strftime("%Y-%m")
target_date = date or datetime.now().strftime("%Y-%m-%d")
try:
logger.debug(f"[ScheduleAPI] 正在获取 {target_month}月度计划...")
return await get_active_plans_for_month(target_month)
logger.debug(f"[ScheduleAPI] 正在获取 {target_date}{start_time}{end_time}活动...")
schedule_data = await _get_schedule_from_db(target_date)
if not schedule_data:
return None
start = datetime.strptime(start_time, "%H:%M").time()
end = datetime.strptime(end_time, "%H:%M").time()
activities_in_range = []
for event in schedule_data:
time_range = event.get("time_range")
if not time_range:
continue
try:
event_start_str, event_end_str = time_range.split("-")
event_start = datetime.strptime(event_start_str.strip(), "%H:%M").time()
if start <= event_start < end:
activities_in_range.append(event)
except (ValueError, KeyError):
continue
if formatted:
return _format_schedule_list(activities_in_range, format_template, "schedule")
return activities_in_range
except Exception as e:
logger.error(f"[ScheduleAPI] 获取 {target_month} 月度计划失败: {e}")
return []
logger.error(f"[ScheduleAPI] 获取时间段内活动失败: {e}")
return None
@staticmethod
async def ensure_monthly_plans(target_month: str | None = None) -> bool:
"""(异步) 确保指定月份存在月度计划,如果不存在则触发生成
async def get_monthly_plans(
target_month: str | None = None,
random_count: int | None = None,
formatted: bool = False,
format_template: str = "- {plan_text}",
) -> list[MonthlyPlan] | str | None:
"""
(异步) 获取指定月份的有效月度计划。
Args:
target_month (Optional[str]): 目标月份,格式 "YYYY-MM"。如果为None则使用当前月份。
target_month (Optional[str]): 目标月份,格式 "YYYY-MM"。如果为None则使用当前月份。
random_count (Optional[int]): 如果设置,将随机返回指定数量的计划。
formatted (bool): 如果为True返回格式化的字符串否则返回对象列表。
format_template (str): 当 formatted=True 时使用的格式化模板。
Returns:
bool: 操作是否成功 (如果已存在或成功生成)
Union[List[MonthlyPlan], str, None]: 月度计划列表、格式化字符串或None。
"""
if target_month is None:
target_month = datetime.now().strftime("%Y-%m")
month = target_month or datetime.now().strftime("%Y-%m")
try:
logger.info(f"[ScheduleAPI] 正在确保 {target_month} 的月度计划存在...")
return await schedule_manager.plan_manager.ensure_and_generate_plans_if_needed(target_month)
logger.debug(f"[ScheduleAPI] 正在获取 {month} 的月度计划...")
plans = await get_active_plans_for_month(month)
if not plans:
return None
if random_count is not None and random_count > 0 and len(plans) > random_count:
plans = random.sample(plans, random_count)
if formatted:
return _format_schedule_list(plans, format_template, "plan")
return plans
except Exception as e:
logger.error(f"[ScheduleAPI] 确保 {target_month} 月度计划失败: {e}")
return False
logger.error(f"[ScheduleAPI] 获取 {month} 月度计划失败: {e}")
return None
@staticmethod
async def archive_monthly_plans(target_month: str | None = None) -> bool:
"""(异步) 归档指定月份的月度计划
async def count_monthly_plans(target_month: str | None = None) -> int:
"""
(异步) 获取指定月份的有效月度计划总数。
Args:
target_month (Optional[str]): 目标月份,格式 "YYYY-MM"。如果为None则使用当前月份。
target_month (Optional[str]): 目标月份,格式 "YYYY-MM"。如果为None则使用当前月份。
Returns:
bool: 操作是否成功
int: 有效月度计划的数量。
"""
if target_month is None:
target_month = datetime.now().strftime("%Y-%m")
month = target_month or datetime.now().strftime("%Y-%m")
try:
logger.info(f"[ScheduleAPI] 正在归档 {target_month} 的月度计划...")
await schedule_manager.plan_manager.archive_current_month_plans(target_month)
return True
logger.debug(f"[ScheduleAPI] 正在统计 {month} 的月度计划数量...")
async with get_db_session() as session:
result = await session.execute(
select(func.count(MonthlyPlan.id)).where(
MonthlyPlan.target_month == month, MonthlyPlan.status == "active"
)
)
return result.scalar_one() or 0
except Exception as e:
logger.error(f"[ScheduleAPI] 归档 {target_month} 月度计划失败: {e}")
return False
logger.error(f"[ScheduleAPI] 统计 {month} 月度计划数量失败: {e}")
return 0
# =============================================================================
# 模块级别的便捷函数 (全部为异步)
# =============================================================================
async def get_today_schedule() -> list[dict[str, Any]] | None:
"""(异步) 获取今天的日程安排的便捷函数"""
return await ScheduleAPI.get_today_schedule()
async def get_schedule(
date: str | None = None,
formatted: bool = False,
format_template: str = "{time_range}: {activity}",
) -> list[dict[str, Any]] | str | None:
"""(异步) 获取指定日期的日程安排的便捷函数。"""
return await ScheduleAPI.get_schedule(date, formatted, format_template)
async def get_current_activity() -> str | None:
"""(异步) 获取当前正在进行的活动的便捷函数"""
return await ScheduleAPI.get_current_activity()
async def get_current_activity(
formatted: bool = False,
format_template: str = "{time_range}: {activity}",
) -> dict[str, Any] | str | None:
"""(异步) 获取当前正在进行的活动的便捷函数。"""
return await ScheduleAPI.get_current_activity(formatted, format_template)
async def regenerate_schedule() -> bool:
"""(异步) 触发后台重新生成今天的日程的便捷函数"""
return await ScheduleAPI.regenerate_schedule()
async def get_activities_between(
start_time: str,
end_time: str,
date: str | None = None,
formatted: bool = False,
format_template: str = "{time_range}: {activity}",
) -> list[dict[str, Any]] | str | None:
"""(异步) 获取指定时间范围内活动的便捷函数。"""
return await ScheduleAPI.get_activities_between(start_time, end_time, date, formatted, format_template)
async def get_monthly_plans(target_month: str | None = None) -> list[MonthlyPlan]:
"""(异步) 获取指定月份的有效月度计划的便捷函数"""
return await ScheduleAPI.get_monthly_plans(target_month)
async def get_monthly_plans(
target_month: str | None = None,
random_count: int | None = None,
formatted: bool = False,
format_template: str = "- {plan_text}",
) -> list[MonthlyPlan] | str | None:
"""(异步) 获取月度计划的便捷函数。"""
return await ScheduleAPI.get_monthly_plans(target_month, random_count, formatted, format_template)
async def ensure_monthly_plans(target_month: str | None = None) -> bool:
"""(异步) 确保指定月份存在月度计划的便捷函数"""
return await ScheduleAPI.ensure_monthly_plans(target_month)
async def archive_monthly_plans(target_month: str | None = None) -> bool:
"""(异步) 归档指定月份的月度计划的便捷函数"""
return await ScheduleAPI.archive_monthly_plans(target_month)
async def count_monthly_plans(target_month: str | None = None) -> int:
"""(异步) 获取月度计划总数的便捷函数"""
return await ScheduleAPI.count_monthly_plans(target_month)

View File

@@ -0,0 +1,167 @@
"""
@File : storage_api.py
@Time : 2025/10/25 11:03:15
@Author : 墨墨
@Version : 2.0
@Desc : 提供给插件使用的本地存储API集成版
"""
import json
import os
import threading
from typing import Any
from src.common.logger import get_logger
# 获取日志记录器
logger = get_logger("PluginStorageManager")
# --- 核心管理器部分 ---
class PluginStorageManager:
"""
一个用于管理插件本地JSON数据存储的类。
它处理文件的读写、数据缓存以及线程安全,确保每个插件实例的独立性。
现在它和API住在一起了希望它们能和睦相处。
"""
_instances: dict[str, "PluginStorage"] = {}
_lock = threading.Lock()
_base_path = os.path.join("data", "plugin_data")
@classmethod
def get_storage(cls, name: str) -> "PluginStorage":
"""
获取指定名称的插件存储实例的工厂方法。
"""
with cls._lock:
if name not in cls._instances:
logger.info(f"为插件 '{name}' 创建新的本地存储实例。")
cls._instances[name] = PluginStorage(name, cls._base_path)
else:
logger.debug(f"从缓存中获取插件 '{name}' 的本地存储实例。")
return cls._instances[name]
# --- 单个存储实例部分 ---
class PluginStorage:
"""
单个插件的本地存储操作类。
提供了多种方法来读取、写入和修改JSON文件中的数据。
把数据交给我,你就放心吧。
"""
def __init__(self, name: str, base_path: str):
self.name = name
safe_filename = "".join(c for c in name if c.isalnum() or c in ("_", "-")).rstrip()
self.file_path = os.path.join(base_path, f"{safe_filename}.json")
self._data: dict[str, Any] = {}
self._lock = threading.Lock()
self._ensure_directory_exists()
self._load_data()
def _ensure_directory_exists(self) -> None:
try:
directory = os.path.dirname(self.file_path)
if not os.path.exists(directory):
logger.info(f"存储目录 '{directory}' 不存在,正在创建...")
os.makedirs(directory)
logger.info(f"目录 '{directory}' 创建成功。")
except Exception as e:
logger.error(f"创建存储目录时发生错误: {e}", exc_info=True)
raise
def _load_data(self) -> None:
with self._lock:
try:
if os.path.exists(self.file_path):
with open(self.file_path, encoding="utf-8") as f:
content = f.read()
self._data = json.loads(content) if content else {}
else:
self._data = {}
except (json.JSONDecodeError, Exception) as e:
logger.warning(f"'{self.file_path}' 加载数据失败: {e},将初始化为空数据。")
self._data = {}
def _save_data(self) -> None:
with self._lock:
try:
with open(self.file_path, "w", encoding="utf-8") as f:
json.dump(self._data, f, indent=4, ensure_ascii=False)
except Exception as e:
logger.error(f"'{self.file_path}' 保存数据时发生错误: {e}", exc_info=True)
raise
def get(self, key: str, default: Any | None = None) -> Any:
return self._data.get(key, default)
def set(self, key: str, value: Any) -> None:
"""
设置一个键值对。
如果键已存在,则覆盖它的值;如果不存在,则创建新的键值对。
这是“设置”或“更新”操作。
"""
logger.debug(f"'{self.name}' 存储中设置值: key='{key}'")
self._data[key] = value
self._save_data()
def add(self, key: str, value: Any) -> bool:
"""
添加一个新的键值对。
只有当键不存在时,才会添加成功。如果键已存在,则不进行任何操作。
这是专门的“新增”操作,满足你的要求了吧,主人?
Returns:
bool: 如果成功添加则返回 True如果键已存在则返回 False。
"""
if key not in self._data:
logger.debug(f"'{self.name}' 存储中新增值: key='{key}'")
self._data[key] = value
self._save_data()
return True
logger.warning(f"尝试为已存在的键 '{key}' 新增值,操作被忽略。")
return False
def update(self, data: dict[str, Any]) -> None:
self._data.update(data)
self._save_data()
def delete(self, key: str) -> bool:
if key in self._data:
del self._data[key]
self._save_data()
return True
return False
def get_all(self) -> dict[str, Any]:
return self._data.copy()
def clear(self) -> None:
logger.warning(f"插件 '{self.name}' 的本地存储将被清空!")
self._data = {}
self._save_data()
# --- 对外暴露的API函数 ---
def get_local_storage(name: str) -> "PluginStorage":
"""
获取一个专属于插件的本地存储实例。
这是插件与本地存储功能交互的唯一入口。
"""
if not isinstance(name, str) or not name:
logger.error("获取本地存储失败:插件名称(name)必须是一个非空字符串。")
raise ValueError("插件名称(name)不能为空字符串。")
try:
storage_instance = PluginStorageManager.get_storage(name)
return storage_instance
except Exception as e:
logger.critical(f"为插件 '{name}' 提供本地存储实例时发生严重错误: {e}", exc_info=True)
raise

View File

@@ -135,6 +135,11 @@ class BasePlugin(PluginBase):
components = self.get_plugin_components()
# 检查依赖
if not self._check_dependencies():
logger.error(f"{self.log_prefix} 依赖检查失败,跳过注册")
return False
# 注册所有组件
registered_components = []
for component_info, component_class in components:

View File

@@ -3,7 +3,7 @@ from typing import Any
from src.chat.utils.prompt_params import PromptParameters
from src.common.logger import get_logger
from src.plugin_system.base.component_types import ComponentType, InjectionRule, PromptInfo
from src.plugin_system.base.component_types import ComponentType, PromptInfo
logger = get_logger("base_prompt")
@@ -16,7 +16,7 @@ class BasePrompt(ABC):
子类可以通过类属性定义其行为:
- prompt_name: Prompt组件的唯一名称。
- injection_rules: 定义注入规则的列表。
- injection_point: 指定要注入的目标Prompt名称或名称列表
"""
prompt_name: str = ""
@@ -24,15 +24,11 @@ class BasePrompt(ABC):
prompt_description: str = ""
"""Prompt组件的描述"""
# 定义此组件希望如何注入到核心Prompt中
# 是一个 InjectionRule 对象的列表,可以实现复杂的注入逻辑
# 例如: [InjectionRule(target_prompt="planner_prompt", injection_type=InjectionType.APPEND, priority=50)]
injection_rules: list[InjectionRule] = []
"""定义注入规则的列表"""
# 旧的注入点定义,用于向后兼容。如果定义了这个,它将被自动转换为 injection_rules。
injection_point: str | list[str] | None = None
"""[已废弃] 要注入的目标Prompt名称或列表请使用 injection_rules"""
# 定义此组件希望注入到哪个或哪些核心Prompt中
# 可以是一个字符串(单个目标)或字符串列表(多个目标)
# 例如: "planner_prompt" 或 ["s4u_style_prompt", "normal_style_prompt"]
injection_point: str | list[str] = ""
"""要注入的目标Prompt名称或列表"""
def __init__(self, params: PromptParameters, plugin_config: dict | None = None):
"""初始化Prompt组件
@@ -91,11 +87,9 @@ class BasePrompt(ABC):
if not cls.prompt_name:
raise ValueError("Prompt组件必须定义 'prompt_name' 类属性。")
# 同时传递新旧两种定义PromptInfo的__post_init__将处理兼容性问题
return PromptInfo(
name=cls.prompt_name,
component_type=ComponentType.PROMPT,
description=cls.prompt_description,
injection_rules=cls.injection_rules,
injection_point=cls.injection_point,
)

View File

@@ -2,38 +2,6 @@ from dataclasses import dataclass, field
from enum import Enum
from typing import Any
class InjectionType(Enum):
"""Prompt注入类型枚举"""
PREPEND = "prepend" # 在开头添加
APPEND = "append" # 在末尾添加
REPLACE = "replace" # 替换指定内容
REMOVE = "remove" # 删除指定内容
INSERT_AFTER = "insert_after" # 在指定内容之后插入
def __str__(self) -> str:
return self.value
@dataclass
class InjectionRule:
"""Prompt注入规则"""
target_prompt: str # 目标Prompt的名称
injection_type: InjectionType = InjectionType.PREPEND # 注入类型
priority: int = 100 # 优先级,数字越小越先执行
target_content: str | None = None # 用于REPLACE、REMOVE和INSERT_AFTER操作的目标内容支持正则表达式
def __post_init__(self):
if self.injection_type in [
InjectionType.REPLACE,
InjectionType.REMOVE,
InjectionType.INSERT_AFTER,
] and self.target_content is None:
raise ValueError(f"'{self.injection_type.value}'类型的注入规则必须提供 'target_content'")
from maim_message import Seg
from src.llm_models.payload_content.tool_option import ToolCall as ToolCall
@@ -166,7 +134,7 @@ class ComponentInfo:
@dataclass
class ActionInfo(ComponentInfo):
"""动作组件信息
注意:激活类型相关字段已废弃,推荐使用 Action 类的 go_activate() 方法来自定义激活逻辑。
这些字段将继续保留以提供向后兼容性BaseAction.go_activate() 的默认实现会使用这些字段。
"""
@@ -303,30 +271,13 @@ class EventInfo(ComponentInfo):
class PromptInfo(ComponentInfo):
"""Prompt组件信息"""
injection_rules: list[InjectionRule] = field(default_factory=list)
"""定义此组件如何注入到其他Prompt"""
# 旧的injection_point用于向后兼容
injection_point: str | list[str] | None = None
injection_point: str | list[str] = ""
"""要注入的目标Prompt名称或列表"""
def __post_init__(self):
super().__post_init__()
self.component_type = ComponentType.PROMPT
# 向后兼容逻辑:如果定义了旧的 injection_point则自动转换为新的 injection_rules
if self.injection_point:
if not self.injection_rules: # 仅当rules为空时转换
points = []
if isinstance(self.injection_point, str):
points.append(self.injection_point)
elif isinstance(self.injection_point, list):
points = self.injection_point
for point in points:
self.injection_rules.append(InjectionRule(target_prompt=point))
# 转换后可以清空旧字段,避免混淆
self.injection_point = None
@dataclass
class PluginInfo:
@@ -341,7 +292,7 @@ class PluginInfo:
is_built_in: bool = False # 是否为内置插件
components: list[ComponentInfo] = field(default_factory=list) # 包含的组件列表
dependencies: list[str] = field(default_factory=list) # 依赖的其他插件
python_dependencies: list[str | PythonDependency] = field(default_factory=list) # Python包依赖
python_dependencies: list[PythonDependency] = field(default_factory=list) # Python包依赖
config_file: str = "" # 配置文件路径
metadata: dict[str, Any] = field(default_factory=dict) # 额外元数据
# 新增manifest相关信息

View File

@@ -12,6 +12,7 @@ from src.config.config import CONFIG_DIR
from src.plugin_system.base.component_types import (
PermissionNodeField,
PluginInfo,
PythonDependency,
)
from src.plugin_system.base.config_types import ConfigField
from src.plugin_system.base.plugin_metadata import PluginMetadata
@@ -29,6 +30,8 @@ class PluginBase(ABC):
plugin_name: str
config_file_name: str
enable_plugin: bool = True
dependencies: list[str] = []
python_dependencies: list[str | PythonDependency] = []
config_schema: dict[str, dict[str, ConfigField] | str] = {}
@@ -61,6 +64,12 @@ class PluginBase(ABC):
self.plugin_description = self.plugin_meta.description
self.plugin_author = self.plugin_meta.author
# 标准化Python依赖为PythonDependency对象
normalized_python_deps = self._normalize_python_dependencies(self.python_dependencies)
# 检查Python依赖
self._check_python_dependencies(normalized_python_deps)
# 创建插件信息对象
self.plugin_info = PluginInfo(
name=self.plugin_name,
@@ -71,8 +80,8 @@ class PluginBase(ABC):
enabled=self._is_enabled,
is_built_in=False,
config_file=self.config_file_name or "",
dependencies=self.plugin_meta.dependencies.copy(),
python_dependencies=self.plugin_meta.python_dependencies.copy(),
dependencies=self.dependencies.copy(),
python_dependencies=normalized_python_deps,
)
logger.debug(f"{self.log_prefix} 插件基类初始化完成")
@@ -358,6 +367,20 @@ class PluginBase(ABC):
self._is_enabled = self.config["plugin"]["enabled"]
logger.info(f"{self.log_prefix} 从配置更新插件启用状态: {self._is_enabled}")
def _check_dependencies(self) -> bool:
"""检查插件依赖"""
from src.plugin_system.core.component_registry import component_registry
if not self.dependencies:
return True
for dep in self.dependencies:
if not component_registry.get_plugin_info(dep):
logger.error(f"{self.log_prefix} 缺少依赖插件: {dep}")
return False
return True
def get_config(self, key: str, default: Any = None) -> Any:
"""获取插件配置值,支持嵌套键访问
@@ -380,6 +403,61 @@ class PluginBase(ABC):
return current
def _normalize_python_dependencies(self, dependencies: Any) -> list[PythonDependency]:
"""将依赖列表标准化为PythonDependency对象"""
from packaging.requirements import Requirement
normalized = []
for dep in dependencies:
if isinstance(dep, str):
try:
# 尝试解析为requirement格式 (如 "package>=1.0.0")
req = Requirement(dep)
version_spec = str(req.specifier) if req.specifier else ""
normalized.append(
PythonDependency(
package_name=req.name,
version=version_spec,
install_name=dep, # 保持原始的安装名称
)
)
except Exception:
# 如果解析失败,作为简单包名处理
normalized.append(PythonDependency(package_name=dep, install_name=dep))
elif isinstance(dep, PythonDependency):
normalized.append(dep)
else:
logger.warning(f"{self.log_prefix} 未知的依赖格式: {dep}")
return normalized
def _check_python_dependencies(self, dependencies: list[PythonDependency]) -> bool:
"""检查Python依赖并尝试自动安装"""
if not dependencies:
logger.info(f"{self.log_prefix} 无Python依赖需要检查")
return True
try:
# 延迟导入以避免循环依赖
from src.plugin_system.utils.dependency_manager import get_dependency_manager
dependency_manager = get_dependency_manager()
success, errors = dependency_manager.check_and_install_dependencies(dependencies, self.plugin_name)
if success:
logger.info(f"{self.log_prefix} Python依赖检查通过")
return True
else:
logger.error(f"{self.log_prefix} Python依赖检查失败:")
for error in errors:
logger.error(f"{self.log_prefix} - {error}")
return False
except Exception as e:
logger.error(f"{self.log_prefix} Python依赖检查时发生异常: {e}", exc_info=True)
return False
@abstractmethod
def register_plugin(self) -> bool:
"""

View File

@@ -1,8 +1,6 @@
from dataclasses import dataclass, field
from typing import Any
from src.plugin_system.base.component_types import PythonDependency
@dataclass
class PluginMetadata:
@@ -25,9 +23,5 @@ class PluginMetadata:
keywords: list[str] = field(default_factory=list) # 关键词
categories: list[str] = field(default_factory=list) # 分类
# 依赖关系
dependencies: list[str] = field(default_factory=list) # 插件依赖
python_dependencies: list[str | PythonDependency] = field(default_factory=list) # Python包依赖
# 扩展字段
extra: dict[str, Any] = field(default_factory=dict) # 其他任意信息

View File

@@ -66,7 +66,7 @@ class PlusCommand(ABC):
# 验证聊天类型限制
if not self._validate_chat_type():
is_group = hasattr(self.message, "is_group_message") and self.message.is_group_message
is_group = self.message.message_info.group_info.group_id
logger.warning(
f"{self.log_prefix} 命令 '{self.command_name}' 不支持当前聊天类型: "
f"{'群聊' if is_group else '私聊'}, 允许类型: {self.chat_type_allow.value}"
@@ -74,11 +74,11 @@ class PlusCommand(ABC):
def _parse_command(self) -> None:
"""解析命令和参数"""
if not hasattr(self.message, "plain_text") or not self.message.plain_text:
if not hasattr(self.message, "processed_plain_text") or not self.message.processed_plain_text:
self.args = CommandArgs("")
return
plain_text = self.message.plain_text.strip()
plain_text = self.message.processed_plain_text.strip()
# 获取配置的命令前缀
prefixes = global_config.command.command_prefixes
@@ -152,10 +152,10 @@ class PlusCommand(ABC):
def _is_exact_command_call(self) -> bool:
"""检查是否是精确的命令调用(无参数)"""
if not hasattr(self.message, "plain_text") or not self.message.plain_text:
if not hasattr(self.message, "plain_text") or not self.message.processed_plain_text:
return False
plain_text = self.message.plain_text.strip()
plain_text = self.message.processed_plain_text.strip()
# 获取配置的命令前缀
prefixes = global_config.command.command_prefixes
@@ -435,3 +435,4 @@ def create_plus_command_adapter(plus_command_class):
# 兼容旧的命名
PlusCommandAdapter = create_plus_command_adapter

View File

@@ -323,33 +323,6 @@ class PluginManager:
init_module = module_from_spec(init_spec)
init_spec.loader.exec_module(init_module)
# --- 在这里进行依赖检查 ---
if hasattr(init_module, "__plugin_meta__"):
metadata = getattr(init_module, "__plugin_meta__")
from src.plugin_system.utils.dependency_manager import get_dependency_manager
dependency_manager = get_dependency_manager()
# 1. 检查Python依赖
if metadata.python_dependencies:
success, errors = dependency_manager.check_and_install_dependencies(
metadata.python_dependencies, metadata.name
)
if not success:
error_msg = f"Python依赖检查失败: {', '.join(errors)}"
self.failed_plugins[plugin_name] = error_msg
logger.error(f"❌ 插件加载失败: {plugin_name} - {error_msg}")
return None # 依赖检查失败,不加载该模块
# 2. 检查插件依赖
if not self._check_plugin_dependencies(metadata):
error_msg = f"插件依赖检查失败: 请确保依赖 {metadata.dependencies} 已正确安装并加载。"
self.failed_plugins[plugin_name] = error_msg
logger.error(f"❌ 插件加载失败: {plugin_name} - {error_msg}")
return None # 插件依赖检查失败
# --- 依赖检查逻辑结束 ---
# 然后加载 plugin.py
spec = spec_from_file_location(module_name, plugin_file)
if spec is None or spec.loader is None:
@@ -362,8 +335,7 @@ class PluginManager:
# 将 __plugin_meta__ 从 init_module 附加到主模块
if init_module and hasattr(init_module, "__plugin_meta__"):
metadata = getattr(init_module, "__plugin_meta__")
setattr(module, "__plugin_meta__", metadata)
setattr(module, "__plugin_meta__", getattr(init_module, "__plugin_meta__"))
logger.debug(f"插件模块加载成功: {plugin_file} -> {plugin_name} ({plugin_dir})")
return module
@@ -374,20 +346,6 @@ class PluginManager:
self.failed_plugins[plugin_name if "plugin_name" in locals() else module_name] = error_msg
return None
def _check_plugin_dependencies(self, plugin_meta: PluginMetadata) -> bool:
"""检查插件的插件依赖"""
dependencies = plugin_meta.dependencies
if not dependencies:
return True
for dep_name in dependencies:
# 检查依赖的插件类是否已注册
if dep_name not in self.plugin_classes:
logger.error(f"插件 '{plugin_meta.name}' 缺少依赖: 插件 '{dep_name}' 未找到或加载失败。")
return False
logger.debug(f"插件 '{plugin_meta.name}' 的所有依赖都已找到。")
return True
# == 显示统计与插件信息 ==
def _show_stats(self, total_registered: int, total_failed_registration: int):

View File

@@ -60,7 +60,7 @@ class ChatterPlanFilter:
prompt, used_message_id_list = await self._build_prompt(plan)
plan.llm_prompt = prompt
if global_config.debug.show_prompt:
logger.info(f"规划器原始提示词:{prompt}") #叫你不要改你耳朵聋吗😡😡😡😡😡
logger.debug(f"规划器原始提示词:{prompt}")
llm_content, _ = await self.planner_llm.generate_response_async(prompt=prompt)

View File

@@ -162,6 +162,16 @@ class MessageHandler:
)
logger.debug(f"原始消息内容: {raw_message.get('message', [])}")
# 检查是否包含@或video消息段
message_segments = raw_message.get("message", [])
if message_segments:
for i, seg in enumerate(message_segments):
seg_type = seg.get("type")
if seg_type in ["at", "video"]:
logger.info(f"检测到 {seg_type.upper()} 消息段 [{i}]: {seg}")
elif seg_type not in ["text", "face", "image"]:
logger.warning(f"检测到特殊消息段 [{i}]: type={seg_type}, data={seg.get('data', {})}")
message_type: str = raw_message.get("message_type")
message_id: int = raw_message.get("message_id")
# message_time: int = raw_message.get("time")

View File

@@ -237,6 +237,7 @@ class SendHandler:
target_id = str(target_id)
if target_id == "notice":
return payload
logger.info(target_id if isinstance(target_id, str) else "")
new_payload = self.build_payload(
payload,
await self.handle_reply_message(target_id if isinstance(target_id, str) else "", user_info),
@@ -321,7 +322,7 @@ class SendHandler:
# 如果没有获取到被回复者的ID则直接返回不进行@
if not replied_user_id:
logger.warning(f"无法获取消息 {id} 的发送者信息,跳过 @")
logger.debug(f"最终返回的回复段: {reply_seg}")
logger.info(f"最终返回的回复段: {reply_seg}")
return reply_seg
# 根据概率决定是否艾特用户
@@ -339,7 +340,7 @@ class SendHandler:
logger.info(f"最终返回的回复段: {reply_seg}")
return reply_seg
logger.debug(f"最终返回的回复段: {reply_seg}")
logger.info(f"最终返回的回复段: {reply_seg}")
return reply_seg
def handle_text_message(self, message: str) -> dict:

View File

@@ -21,6 +21,8 @@ from src.plugin_system.apis import (
send_api,
)
from .prompts import DECISION_PROMPT, PLAN_PROMPT
logger = get_logger(__name__)
@@ -80,7 +82,51 @@ class ProactiveThinkerExecutor:
)
logger.info(f"决策结果为:回复。话题: {topic}")
plan_prompt = self._build_plan_prompt(context, start_mode, topic, reason)
# 根据聊天类型构建特定上下文
if context["chat_type"] == "private":
user_info = context["user_info"]
relationship = context["relationship"]
target_user_or_group = f"你的朋友 '{user_info.user_nickname}'"
context_specific_block = f"""
1. **你的日程**:
{context["schedule_context"]}
2. **你和Ta的关系**:
- 详细印象: {relationship["impression"]}
- 好感度: {relationship["attitude"]}/100
3. **最近的聊天摘要**:
{context["recent_chat_history"]}
4. **你最近的相关动作**:
{context["action_history_context"]}
"""
else: # group
group_info = context["group_info"]
target_user_or_group = f"群聊 '{group_info['group_name']}'"
context_specific_block = f"""
1. **你的日程**:
{context["schedule_context"]}
2. **群聊信息**:
- 群名称: {group_info["group_name"]}
3. **最近的聊天摘要**:
{context["recent_chat_history"]}
4. **你最近的相关动作**:
{context["action_history_context"]}
"""
plan_prompt = PLAN_PROMPT.format(
bot_nickname=global_config.bot.nickname,
persona_core=context["persona"]["core"],
persona_side=context["persona"]["side"],
identity=context["persona"]["identity"],
current_time=context["current_time"],
target_user_or_group=target_user_or_group,
reason=reason,
topic=topic,
context_specific_block=context_specific_block,
mood_state=context["mood_state"],
)
if global_config.debug.show_prompt:
logger.info(f"主动思考回复器原始提示词:{plan_prompt}")
is_success, response, _, _ = await llm_api.generate_with_model(
prompt=plan_prompt, model_config=model_config.model_task_config.replyer
@@ -222,150 +268,54 @@ class ProactiveThinkerExecutor:
logger.warning(f"Stream {stream_id} 既没有 group_info 也没有 user_info")
return None
def _build_decision_prompt(self, context: dict[str, Any], start_mode: str) -> str:
"""
根据收集到的上下文信息,构建用于决策的提示词。
Args:
context: 包含所有上下文信息的字典。
start_mode: 启动模式 ('cold_start''wake_up')。
Returns:
构建完成的决策提示词字符串。
"""
chat_type = context["chat_type"]
persona = context["persona"]
# 构建通用头部
prompt = f"""
# 角色
你的名字是{global_config.bot.nickname},你的人设如下:
- 核心人设: {persona["core"]}
- 侧面人设: {persona["side"]}
- 身份: {persona["identity"]}
你的当前情绪状态是: {context["mood_state"]}
# 你最近的相关决策历史 (供参考)
{context["action_history_context"]}
"""
# 根据聊天类型构建任务和情境
if chat_type == "private":
user_info = context["user_info"]
relationship = context["relationship"]
prompt += f"""
# 任务
现在是 {context["current_time"]},你需要根据当前的情境,决定是否要主动向用户 '{user_info.user_nickname}' 发起对话。
# 情境分析
1. **启动模式**: {start_mode} ({"初次见面/很久未见" if start_mode == "cold_start" else "日常唤醒"})
2. **你的日程**:
{context["schedule_context"]}
3. **你和Ta的关系**:
- 简短印象: {relationship["short_impression"]}
- 详细印象: {relationship["impression"]}
- 好感度: {relationship["attitude"]}/100
4. **和Ta在别处的讨论摘要**:
{context["cross_context_block"]}
5. **最近的聊天摘要**:
{context["recent_chat_history"]}
"""
elif chat_type == "group":
group_info = context["group_info"]
prompt += f"""
# 任务
现在是 {context["current_time"]},你需要根据当前的情境,决定是否要主动向群聊 '{group_info["group_name"]}' 发起对话。
# 情境分析
1. **启动模式**: {start_mode} ({"首次加入/很久未发言" if start_mode == "cold_start" else "日常唤醒"})
2. **你的日程**:
{context["schedule_context"]}
3. **群聊信息**:
- 群名称: {group_info["group_name"]}
4. **最近的聊天摘要**:
{context["recent_chat_history"]}
"""
# 构建通用尾部
prompt += """
# 决策目标
你的最终目标是根据你的角色和当前情境,做出一个最符合人类社交直觉的决策,以求:
- **(私聊)深化关系**: 通过展现你的关心、记忆和个性来拉近与对方的距离。
- **(群聊)活跃气氛**: 提出能引起大家兴趣的话题,促进群聊的互动。
- **提供价值**: 你的出现应该是有意义的,无论是情感上的温暖,还是信息上的帮助。
- **保持自然**: 避免任何看起来像机器人或骚扰的行为。
# 决策指令
请综合以上所有信息以稳定、真实、拟人的方式做出决策。你的决策需要以JSON格式输出包含以下字段
- `should_reply`: bool, 是否应该发起对话。
- `topic`: str, 如果 `should_reply` 为 true你打算聊什么话题
- `reason`: str, 做出此决策的简要理由,需体现你对上述目标的考量。
# 决策流程与核心原则
1. **检查对话状态**:
- **最后发言者**: 查看【最近的聊天摘要】。如果最后一条消息是你发的,且对方尚未回复,**通常应选择不回复**。这是最重要的原则,以避免打扰。
- **例外**: 只有在等待时间足够长(例如超过数小时),或者你有非常重要且有时效性的新话题(例如,“你昨晚说的那个电影我刚看了!”)时,才考虑再次发言。
- **无人发言**: 如果最近的聊天记录里只有你一个人在说话,**绝对不要回复**,以防刷屏。
2. **寻找话题切入点 (如果可以回复)**:
- **强关联优先**: 优先从【情境分析】中寻找最自然、最相关的话题。顺序建议:`最近的聊天摘要` > `你和Ta的关系` > `你的日程`。一个好的话题往往是对最近对话的延续。
- **展现个性**: 结合你的【人设】和【情绪】,思考你会如何看待这些情境信息,并从中找到话题。例如,如果你是一个活泼的人,看到对方日程很满,可以说:“看你今天日程满满,真是活力四射的一天呀!”
- **备选方案**: 如果实在没有强关联的话题,可以发起一个简单的日常问候,如“在吗?”或“下午好”。
3. **最终决策**:
- **权衡频率**: 查看【你最近的相关决策历史】。如果你在短时间内已经主动发起过多次对话,即使现在有话题,也应倾向于**不回复**,保持一定的社交距离。
- **质量胜于数量**: 宁可错过一次普通的互动机会,也不要进行一次尴尬或生硬的对话。
---
示例1 (基于上下文):
{{
"should_reply": true,
"topic": "关心一下Ta昨天提到的那个项目进展如何了",
"reason": "用户昨天在聊天中提到了一个重要的项目,现在主动关心一下进展,会显得很体贴,也能自然地开启对话。"
}}
示例2 (简单问候):
{{
"should_reply": true,
"topic": "打个招呼问问Ta现在在忙些什么",
"reason": "最近没有聊天记录,日程也很常规,没有特别的切入点。一个简单的日常问候是最安全和自然的方式来重新连接。"
}}
示例3 (不应回复 - 过于频繁):
{{
"should_reply": false,
"topic": null,
"reason": "虽然群里很活跃,但现在是深夜,而且最近的聊天话题我也不熟悉,没有合适的理由去打扰大家。"
}}
示例4 (不应回复 - 等待回应):
{{
"should_reply": false,
"topic": null,
"reason": "我注意到上一条消息是我几分钟前主动发送的,对方可能正在忙。为了表现出耐心和体贴,我现在最好保持安静,等待对方的回应。"
}}
---
请输出你的决策:
"""
return prompt
async def _make_decision(self, context: dict[str, Any], start_mode: str) -> dict[str, Any] | None:
"""
调用 LLM 进行决策,判断是否应该主动发起对话,以及聊什么话题。
Args:
context: 包含所有上下文信息的字典。
start_mode: 启动模式。
Returns:
一个包含决策结果的字典 (例如: {"should_reply": bool, "topic": str, "reason": str})
如果决策过程失败则返回 None 或包含错误信息的字典。
"""
if context["chat_type"] not in ["private", "group"]:
return {"should_reply": False, "reason": "未知的聊天类型"}
prompt = self._build_decision_prompt(context, start_mode)
# 根据聊天类型构建特定上下文
if context["chat_type"] == "private":
user_info = context["user_info"]
relationship = context["relationship"]
target_user_or_group = f"用户 '{user_info.user_nickname}'"
context_specific_block = f"""
1. **启动模式**: {start_mode} ({"初次见面/很久未见" if start_mode == "cold_start" else "日常唤醒"})
2. **你的日程**:
{context["schedule_context"]}
3. **你和Ta的关系**:
- 简短印象: {relationship["short_impression"]}
- 详细印象: {relationship["impression"]}
- 好感度: {relationship["attitude"]}/100
4. **和Ta在别处的讨论摘要**:
{context["cross_context_block"]}
5. **最近的聊天摘要**:
{context["recent_chat_history"]}
"""
else: # group
group_info = context["group_info"]
target_user_or_group = f"群聊 '{group_info['group_name']}'"
context_specific_block = f"""
1. **启动模式**: {start_mode} ({"首次加入/很久未发言" if start_mode == "cold_start" else "日常唤醒"})
2. **你的日程**:
{context["schedule_context"]}
3. **群聊信息**:
- 群名称: {group_info["group_name"]}
4. **最近的聊天摘要**:
{context["recent_chat_history"]}
"""
prompt = DECISION_PROMPT.format(
bot_nickname=global_config.bot.nickname,
persona_core=context["persona"]["core"],
persona_side=context["persona"]["side"],
identity=context["persona"]["identity"],
mood_state=context["mood_state"],
action_history_context=context["action_history_context"],
current_time=context["current_time"],
target_user_or_group=target_user_or_group,
context_specific_block=context_specific_block,
)
if global_config.debug.show_prompt:
logger.info(f"主动思考决策器原始提示词:{prompt}")
@@ -385,160 +335,3 @@ class ProactiveThinkerExecutor:
except orjson.JSONDecodeError:
logger.error(f"决策LLM返回的JSON格式无效: {response}")
return {"should_reply": False, "reason": "决策模型返回格式错误"}
def _build_private_plan_prompt(self, context: dict[str, Any], start_mode: str, topic: str, reason: str) -> str:
"""
为私聊场景构建生成对话内容的规划提示词。
Args:
context: 上下文信息字典。
start_mode: 启动模式。
topic: 决策模块决定的话题。
reason: 决策模块给出的理由。
Returns:
构建完成的私聊规划提示词字符串。
"""
user_info = context["user_info"]
relationship = context["relationship"]
if start_mode == "cold_start":
return f"""
# 任务
你需要主动向一个新朋友 '{user_info.user_nickname}' 发起对话。这是你们的第一次交流,或者很久没聊了。
# 决策上下文
- **决策理由**: {reason}
# 情境分析
1. **你的日程**:
{context["schedule_context"]}
2. **你和Ta的关系**:
- 简短印象: {relationship["short_impression"]}
- 详细印象: {relationship["impression"]}
- 好感度: {relationship["attitude"]}/100
3. **和Ta在别处的讨论摘要**:
{context["cross_context_block"]}
4. **最近的聊天摘要**:
{context["recent_chat_history"]}
5. **你最近的相关动作**:
{context["action_history_context"]}
# 对话指引
- 你的目标是“破冰”,让对话自然地开始。
- 你应该围绕这个话题展开: {topic}
- 你的语气应该符合你的人设和你当前的心情({context["mood_state"]}),友好且真诚。
"""
else: # wake_up
return f"""
# 任务
现在是 {context["current_time"]},你需要主动向你的朋友 '{user_info.user_nickname}' 发起对话。
# 决策上下文
- **决策理由**: {reason}
# 情境分析
1. **你的日程**:
{context["schedule_context"]}
2. **你和Ta的关系**:
- 详细印象: {relationship["impression"]}
- 好感度: {relationship["attitude"]}/100
3. **最近的聊天摘要**:
{context["recent_chat_history"]}
4. **你最近的相关动作**:
{context["action_history_context"]}
# 对话指引
- 你决定和Ta聊聊关于“{topic}”的话题。
- **对话风格**:
- **自然开场**: 你可以根据话题和情境,选择最自然的开场方式。可以直接切入话题(如果话题关联性很强),也可以先用一句简单的问候(如“在吗?”、“下午好”)作为过渡。**不要总是使用同一种开场白**。
- **融合情境**: 将【情境分析】中的信息如你的心情、日程、对Ta的印象巧妙地融入到对话中让你的话语听起来更真实、更有依据。
- **符合人设**: 你的语气、用词、甚至表情符号的使用,都应该完全符合你的【角色】设定和当前【情绪】({context["mood_state"]})以及你对Ta的好感度。
- 请结合以上所有情境信息,自然地开启对话。
"""
def _build_group_plan_prompt(self, context: dict[str, Any], topic: str, reason: str) -> str:
"""
为群聊场景构建生成对话内容的规划提示词。
Args:
context: 上下文信息字典。
topic: 决策模块决定的话题。
reason: 决策模块给出的理由。
Returns:
构建完成的群聊规划提示词字符串。
"""
group_info = context["group_info"]
return f"""
# 任务
现在是 {context["current_time"]},你需要主动向群聊 '{group_info["group_name"]}' 发起对话。
# 决策上下文
- **决策理由**: {reason}
# 情境分析
1. **你的日程**:
你当前的心情({context["mood_state"]}
{context["schedule_context"]}
2. **群聊信息**:
- 群名称: {group_info["group_name"]}
3. **最近的聊天摘要**:
{context["recent_chat_history"]}
4. **你最近的相关动作**:
{context["action_history_context"]}
# 对话指引
- 你决定和大家聊聊关于“{topic}”的话题。
- **对话风格**:
- **自然开场**: 你可以根据话题和情境,选择最自然的开场方式。可以直接切入话题(如果话题关联性很强),也可以先用一句简单的问候(如“哈喽,大家好呀~”、“下午好!”)作为过渡。**不要总是使用同一种开场白**。
- **融合情境**: 将【情境分析】中的信息(如你的心情、日程)巧妙地融入到对话中,让你的话语听起来更真实、更有依据。
- **符合人设**: 你的语气、用词、甚至表情符号的使用,都应该完全符合你的【角色】设定和当前【情绪】({context["mood_state"]})。语气应该更活泼、更具包容性,以吸引更多群成员参与讨论。
- 请结合以上所有情境信息,自然地开启对话。
- 可以分享你的看法、提出相关问题,或者开个合适的玩笑。
"""
def _build_plan_prompt(self, context: dict[str, Any], start_mode: str, topic: str, reason: str) -> str:
"""
根据聊天类型、启动模式和决策结果,构建最终生成对话内容的规划提示词。
Args:
context: 上下文信息字典。
start_mode: 启动模式。
topic: 决策模块决定的话题。
reason: 决策模块给出的理由。
Returns:
最终的规划提示词字符串。
"""
persona = context["persona"]
chat_type = context["chat_type"]
# 1. 构建通用角色头部
prompt = f"""
# 角色
你的名字是{global_config.bot.nickname},你的人设如下:
- 核心人设: {persona["core"]}
- 侧面人设: {persona["side"]}
- 身份: {persona["identity"]}
"""
# 2. 根据聊天类型构建特定内容
if chat_type == "private":
prompt += self._build_private_plan_prompt(context, start_mode, topic, reason)
elif chat_type == "group":
prompt += self._build_group_plan_prompt(context, topic, reason)
# 3. 添加通用结尾
final_instructions = """
# 输出要求
- **简洁**: 不要输出任何多余内容如前缀、后缀、冒号、引号、at/@等)。
- **原创**: 不要重复之前的内容,即使意思相近也不行。
- **直接**: 只输出最终的回复文本本身。
- **风格**: 回复需简短、完整且口语化。
现在,你说:"""
prompt += final_instructions
if global_config.debug.show_prompt:
logger.info(f"主动思考回复器原始提示词:{prompt}")
return prompt

View File

@@ -0,0 +1,97 @@
from src.chat.utils.prompt import Prompt
# =============================================================================
# 决策阶段 (Decision Phase)
# =============================================================================
DECISION_PROMPT = Prompt(
name="proactive_thinker_decision",
template="""
# 角色
你的名字是{bot_nickname},你的人设如下:
- 核心人设: {persona_core}
- 侧面人设: {persona_side}
- 身份: {identity}
你的当前情绪状态是: {mood_state}
# 你最近的相关决策历史 (供参考)
{action_history_context}
# 任务
现在是 {current_time},你需要根据当前的情境,决定是否要主动向{target_user_or_group}发起对话。
# 情境分析
{context_specific_block}
# 决策目标
你的最终目标是根据你的角色和当前情境,做出一个最符合人类社交直觉的决策,以求:
- **(私聊)深化关系**: 通过展现你的关心、记忆和个性来拉近与对方的距离。
- **(群聊)活跃气氛**: 提出能引起大家兴趣的话题,促进群聊的互动。
- **提供价值**: 你的出现应该是有意义的,无论是情感上的温暖,还是信息上的帮助。
- **保持自然**: 避免任何看起来像机器人或骚扰的行为。
# 决策指令
请综合以上所有信息以稳定、真实、拟人的方式做出决策。你的决策需要以JSON格式输出包含以下字段
- `should_reply`: bool, 是否应该发起对话。
- `topic`: str, 如果 `should_reply` 为 true你打算聊什么话题
- `reason`: str, 做出此决策的简要理由,需体现你对上述目标的考量。
# 决策流程与核心原则
1. **检查对话状态**:
- **最后发言者**: 查看【最近的聊天摘要】。如果最后一条消息是你发的,且对方尚未回复,**通常应选择不回复**。这是最重要的原则,以避免打扰。
- **例外**: 只有在等待时间足够长(例如超过数小时),或者你有非常重要且有时效性的新话题时,才考虑再次发言。
- **无人发言**: 如果最近的聊天记录里只有你一个人在说话,**绝对不要回复**,以防刷屏。
2. **寻找话题切入点 (如果可以回复)**:
- **强关联优先**: 优先从【情境分析】中寻找最自然、最相关的话题。顺序建议:`最近的聊天摘要` > `你和Ta的关系` > `你的日程`。
- **展现个性**: 结合你的【人设】和【情绪】,思考你会如何看待这些情境信息,并从中找到话题。
- **备选方案**: 如果实在没有强关联的话题,可以发起一个简单的日常问候。
3. **最终决策**:
- **权衡频率**: 查看【你最近的相关决策历史】。如果你在短时间内已经主动发起过多次对话,也应倾向于**不回复**,保持一定的社交距离。
- **质量胜于数量**: 宁可错过一次普通的互动机会,也不要进行一次尴尬或生硬的对话。
---
请输出你的决策:
"""
)
# =============================================================================
# 回复规划阶段 (Plan Phase)
# =============================================================================
PLAN_PROMPT = Prompt(
name="proactive_thinker_plan",
template="""
# 角色
你的名字是{bot_nickname},你的人设如下:
- 核心人设: {persona_core}
- 侧面人设: {persona_side}
- 身份: {identity}
# 任务
现在是 {current_time},你需要主动向{target_user_or_group}发起对话。
# 决策上下文
- **决策理由**: {reason}
# 情境分析
{context_specific_block}
# 对话指引
- 你决定和Ta聊聊关于“{topic}”的话题。
- **对话风格**:
- **自然开场**: 你可以根据话题和情境,选择最自然的开场方式。可以直接切入话题(如果话题关联性很强),也可以先用一句简单的问候作为过渡。**不要总是使用同一种开场白**。
- **融合情境**: 将【情境分析】中的信息巧妙地融入到对话中,让你的话语听起来更真实、更有依据。
- **符合人设**: 你的语气、用词、甚至表情符号的使用,都应该完全符合你的【角色】设定和当前【情绪】({mood_state})。
# 输出要求
- **简洁**: 不要输出任何多余内容如前缀、后缀、冒号、引号、at/@等)。
- **原创**: 不要重复之前的内容,即使意思相近也不行。
- **直接**: 只输出最终的回复文本本身。
- **风格**: 回复需简短、完整且口语化。
现在,你说:
"""
)

View File

@@ -1,6 +1,5 @@
from src.plugin_system.base.plugin_metadata import PluginMetadata
# 定义插件元数据
__plugin_meta__ = PluginMetadata(
name="MoFox-Bot工具箱",
description="一个集合多种实用功能的插件,旨在提升聊天体验和效率。",
@@ -12,6 +11,4 @@ __plugin_meta__ = PluginMetadata(
keywords=["emoji", "reaction", "like", "表情", "回应", "点赞"],
categories=["Chat", "Integration"],
extra={"is_built_in": "true", "plugin_type": "functional"},
dependencies=[],
python_dependencies=["httpx", "Pillow"],
)

View File

@@ -13,6 +13,5 @@ __plugin_meta__ = PluginMetadata(
extra={
"is_built_in": False,
"plugin_type": "tools",
},
python_dependencies = ["aiohttp", "soundfile", "pedalboard"]
}
)

View File

@@ -2,8 +2,11 @@
TTS 语音合成 Action
"""
from pathlib import Path
import toml
from src.common.logger import get_logger
from src.plugin_system.apis import generator_api
from src.plugin_system.base.base_action import BaseAction, ChatMode
from ..services.manager import get_service
@@ -11,24 +14,96 @@ from ..services.manager import get_service
logger = get_logger("tts_voice_plugin.action")
def _get_available_styles() -> list[str]:
"""动态读取配置文件获取所有可用的TTS风格名称"""
try:
# 这个路径构建逻辑是为了确保无论从哪里启动,都能准确定位到配置文件
plugin_file = Path(__file__).resolve()
# Bot/src/plugins/built_in/tts_voice_plugin/actions -> Bot
bot_root = plugin_file.parent.parent.parent.parent.parent.parent
config_file = bot_root / "config" / "plugins" / "tts_voice_plugin" / "config.toml"
if not config_file.is_file():
logger.warning("在 tts_action 中未找到 tts_voice_plugin 的配置文件,无法动态加载风格列表。")
return ["default"]
config = toml.loads(config_file.read_text(encoding="utf-8"))
styles_config = config.get("tts_styles", [])
if not isinstance(styles_config, list):
return ["default"]
# 使用显式循环和类型检查来提取 style_name以确保 Pylance 类型检查通过
style_names: list[str] = []
for style in styles_config:
if isinstance(style, dict):
name = style.get("style_name")
# 确保 name 是一个非空字符串
if isinstance(name, str) and name:
style_names.append(name)
return style_names if style_names else ["default"]
except Exception as e:
logger.error(f"动态加载TTS风格列表时出错: {e}", exc_info=True)
return ["default"] # 出现任何错误都回退
# 在类定义之前执行函数,获取风格列表
AVAILABLE_STYLES = _get_available_styles()
STYLE_OPTIONS_DESC = ", ".join(f"'{s}'" for s in AVAILABLE_STYLES)
class TTSVoiceAction(BaseAction):
"""
通过关键词或规划器自动触发 TTS 语音合成
"""
action_name = "tts_voice_action"
action_description = "使用GPT-SoVITS将文本转换为语音并发送"
action_description = "将你生成好的文本转换为语音并发送。你必须提供要转换的文本。"
mode_enable = ChatMode.ALL
parallel_action = False
action_parameters = {
"text": {
"type": "string",
"description": "需要转换为语音并发送的完整、自然、适合口语的文本内容。",
"required": True
},
"voice_style": {
"type": "string",
"description": f"语音的风格。可用选项: [{STYLE_OPTIONS_DESC}]。请根据对话的情感和上下文选择一个最合适的风格。如果未提供,将使用默认风格。",
"required": False
},
"text_language": {
"type": "string",
"description": (
"指定用于合成的语言模式,请务必根据文本内容选择最精确、范围最小的选项以获得最佳效果。"
"可用选项说明:\n"
"- 'zh': 中文与英文混合 (最优选)\n"
"- 'ja': 日文与英文混合 (最优选)\n"
"- 'yue': 粤语与英文混合 (最优选)\n"
"- 'ko': 韩文与英文混合 (最优选)\n"
"- 'en': 纯英文\n"
"- 'all_zh': 纯中文\n"
"- 'all_ja': 纯日文\n"
"- 'all_yue': 纯粤语\n"
"- 'all_ko': 纯韩文\n"
"- 'auto': 多语种混合自动识别 (备用选项,当前两种语言时优先使用上面的精确选项)\n"
"- 'auto_yue': 多语种混合自动识别(包含粤语)(备用选项)"
),
"required": False
}
}
action_require = [
"在调用此动作时,你必须在 'text' 参数中提供要合成语音的完整回复内容。这是强制性的。",
"当用户明确请求使用语音进行回复时,例如‘发个语音听听’、‘用语音说’等。",
"当对话内容适合用语音表达,例如讲故事、念诗、撒嬌或进行角色扮演时。",
"在表达特殊情感(如安慰、鼓励、庆祝)的场景下,可以主动使用语音来增强感染力。",
"不要在日常的、简短的问答或闲聊中频繁使用语音,避免打扰用户。",
"文本内容必须是纯粹的对话,不能包含任何括号或方括号括起来的动作、表情、或场景描述(例如,不要出现 '(笑)''[歪头]'",
"必须使用标准、完整的标点符号(如逗号、句号、问号)来进行自然的断句,以确保语音停顿自然,避免生成一长串没有停顿的文本"
"提供的 'text' 内容必须是纯粹的对话,不能包含任何括号或方括号括起来的动作、表情、或场景描述(例如,不要出现 '(笑)''[歪头]'",
"【**铁则**】为了确保语音停顿自然,'text' 参数中的所有断句【必须使用且仅能使用以下标准标点符号:''''''''。严禁使用 '''...' 或其他任何非标准符号来分隔句子,否则将导致语音合成失败"
]
def __init__(self, *args, **kwargs):
@@ -80,16 +155,23 @@ class TTSVoiceAction(BaseAction):
initial_text = self.action_data.get("text", "").strip()
voice_style = self.action_data.get("voice_style", "default")
logger.info(f"{self.log_prefix} 接收到规划器的初步文本: '{initial_text[:70]}...'")
# 新增:从决策模型获取指定的语言模式
text_language = self.action_data.get("text_language") # 如果模型没给,就是 None
logger.info(f"{self.log_prefix} 接收到规划器初步文本: '{initial_text[:70]}...', 指定风格: {voice_style}, 指定语言: {text_language}")
# 1. 请求主回复模型生成高质量文本
text = await self._generate_final_text(initial_text)
# 1. 使用规划器提供的文本
text = initial_text
if not text:
logger.warning(f"{self.log_prefix} 最终生成的文本为空,静默处理。")
return False, "最终生成的文本为空"
logger.warning(f"{self.log_prefix} 规划器提供的文本为空,静默处理。")
return False, "规划器提供的文本为空"
# 2. 调用 TTSService 生成语音
audio_b64 = await self.tts_service.generate_voice(text, voice_style)
logger.info(f"{self.log_prefix} 使用最终文本进行语音合成: '{text[:70]}...'")
audio_b64 = await self.tts_service.generate_voice(
text=text,
style_hint=voice_style,
language_hint=text_language # 新增:将决策模型指定的语言传递给服务
)
if audio_b64:
await self.send_custom(message_type="voice", content=audio_b64)
@@ -115,33 +197,3 @@ class TTSVoiceAction(BaseAction):
)
return False, f"语音合成出错: {e!s}"
async def _generate_final_text(self, initial_text: str) -> str:
"""请求主回复模型生成或优化文本"""
try:
generation_reason = (
"这是一个为语音消息TTS生成文本的特殊任务。"
"请基于规划器提供的初步文本,结合对话历史和自己的人设,将它优化成一句自然、富有感情、适合用语音说出的话。"
"最终指令:请务-必确保文本听起来像真实的、自然的口语对话,而不是书面语。"
)
logger.info(f"{self.log_prefix} 请求主回复模型(replyer)全新生成TTS文本...")
success, response_set, _ = await generator_api.rewrite_reply(
chat_stream=self.chat_stream,
reply_data={"raw_reply": initial_text, "reason": generation_reason},
request_type="replyer"
)
if success and response_set:
text = "".join(str(seg[1]) if isinstance(seg, tuple) else str(seg) for seg in response_set).strip()
logger.info(f"{self.log_prefix} 成功生成高质量TTS文本: {text}")
return text
if initial_text:
logger.warning(f"{self.log_prefix} 主模型生成失败,使用规划器原始文本作为兜底。")
return initial_text
raise Exception("主模型未能生成回复,且规划器也未提供兜底文本。")
except Exception as e:
logger.error(f"{self.log_prefix} 生成高质量回复内容时失败: {e}", exc_info=True)
return ""

View File

@@ -30,6 +30,7 @@ class TTSVoicePlugin(BasePlugin):
plugin_author = "Kilo Code & 靚仔"
config_file_name = "config.toml"
dependencies = []
python_dependencies = ["aiohttp", "soundfile", "pedalboard"]
permission_nodes: list[PermissionNodeField] = [
PermissionNodeField(node_name="command.use", description="是否可以使用 /tts 命令"),

View File

@@ -80,21 +80,34 @@ class TTSService:
"prompt_language": style_cfg.get("prompt_language", "zh"),
"gpt_weights": style_cfg.get("gpt_weights", default_gpt_weights),
"sovits_weights": style_cfg.get("sovits_weights", default_sovits_weights),
"speed_factor": style_cfg.get("speed_factor"), # 读取独立的语速配置
"speed_factor": style_cfg.get("speed_factor"),
"text_language": style_cfg.get("text_language", "auto"), # 新增:读取文本语言模式
}
return styles
# ... [其他方法保持不变] ...
def _detect_language(self, text: str) -> str:
chinese_chars = len(re.findall(r"[\u4e00-\u9fff]", text))
english_chars = len(re.findall(r"[a-zA-Z]", text))
def _determine_final_language(self, text: str, mode: str) -> str:
"""根据配置的语言策略和文本内容决定最终发送给API的语言代码"""
# 如果策略是具体的语言(如 all_zh, ja直接使用
if mode not in ["auto", "auto_yue"]:
return mode
# 对于 auto 和 auto_yue 策略,进行内容检测
# 优先检测粤语
if mode == "auto_yue":
cantonese_keywords = ["", "", "", "", "", "", "", "", ""]
if any(keyword in text for keyword in cantonese_keywords):
logger.info("在 auto_yue 模式下检测到粤语关键词,最终语言: yue")
return "yue"
# 检测日语(简单启发式规则)
japanese_chars = len(re.findall(r"[\u3040-\u309f\u30a0-\u30ff]", text))
total_chars = chinese_chars + english_chars + japanese_chars
if total_chars == 0: return "zh"
if chinese_chars / total_chars > 0.3: return "zh"
elif japanese_chars / total_chars > 0.3: return "ja"
elif english_chars / total_chars > 0.8: return "en"
else: return "zh"
if japanese_chars > 5 and japanese_chars > len(re.findall(r"[\u4e00-\u9fff]", text)) * 0.5:
logger.info("检测到日语字符,最终语言: ja")
return "ja"
# 默认回退到中文
logger.info(f"{mode} 模式下未检测到特定语言,默认回退到: zh")
return "zh"
def _clean_text_for_tts(self, text: str) -> str:
# 1. 基本清理
@@ -259,7 +272,7 @@ class TTSService:
logger.error(f"应用空间效果时出错: {e}", exc_info=True)
return audio_data # 如果出错,返回原始音频
async def generate_voice(self, text: str, style_hint: str = "default") -> str | None:
async def generate_voice(self, text: str, style_hint: str = "default", language_hint: str | None = None) -> str | None:
self._load_config()
if not self.tts_styles:
@@ -282,11 +295,21 @@ class TTSService:
clean_text = self._clean_text_for_tts(text)
if not clean_text: return None
text_language = self._detect_language(clean_text)
logger.info(f"开始TTS语音合成文本{clean_text[:50]}..., 风格:{style}")
# 语言决策流程:
# 1. 优先使用决策模型直接指定的 language_hint (最高优先级)
if language_hint:
final_language = language_hint
logger.info(f"使用决策模型指定的语言: {final_language}")
else:
# 2. 如果模型未指定,则使用风格配置的 language_policy
language_policy = server_config.get("text_language", "auto")
final_language = self._determine_final_language(clean_text, language_policy)
logger.info(f"决策模型未指定语言,使用策略 '{language_policy}' -> 最终语言: {final_language}")
logger.info(f"开始TTS语音合成文本{clean_text[:50]}..., 风格:{style}, 最终语言: {final_language}")
audio_data = await self._call_tts_api(
server_config=server_config, text=clean_text, text_language=text_language,
server_config=server_config, text=clean_text, text_language=final_language,
refer_wav_path=server_config.get("refer_wav_path"),
prompt_text=server_config.get("prompt_text"),
prompt_language=server_config.get("prompt_language"),

View File

@@ -1,4 +1,3 @@
from src.plugin_system.base.component_types import PythonDependency
from src.plugin_system.base.plugin_metadata import PluginMetadata
__plugin_meta__ = PluginMetadata(
@@ -14,26 +13,4 @@ __plugin_meta__ = PluginMetadata(
extra={
"is_built_in": True,
},
# Python包依赖列表
python_dependencies = [
PythonDependency(package_name="asyncddgs", description="异步DuckDuckGo搜索库", optional=False),
PythonDependency(
package_name="exa_py",
description="Exa搜索API客户端库",
optional=True, # 如果没有API密钥这个是可选的
),
PythonDependency(
package_name="tavily",
install_name="tavily-python", # 安装时使用这个名称
description="Tavily搜索API客户端库",
optional=True, # 如果没有API密钥这个是可选的
),
PythonDependency(
package_name="httpx",
version=">=0.20.0",
install_name="httpx[socks]", # 安装时使用这个名称(包含可选依赖)
description="支持SOCKS代理的HTTP客户端库",
optional=False,
),
]
)

View File

@@ -5,7 +5,7 @@ Web Search Tool Plugin
"""
from src.common.logger import get_logger
from src.plugin_system import BasePlugin, ComponentInfo, ConfigField, register_plugin
from src.plugin_system import BasePlugin, ComponentInfo, ConfigField, PythonDependency, register_plugin
from src.plugin_system.apis import config_api
from .tools.url_parser import URLParserTool
@@ -74,6 +74,29 @@ class WEBSEARCHPLUGIN(BasePlugin):
except Exception as e:
logger.error(f"❌ 搜索引擎初始化失败: {e}", exc_info=True)
# Python包依赖列表
python_dependencies: list[PythonDependency] = [ # noqa: RUF012
PythonDependency(package_name="asyncddgs", description="异步DuckDuckGo搜索库", optional=False),
PythonDependency(
package_name="exa_py",
description="Exa搜索API客户端库",
optional=True, # 如果没有API密钥这个是可选的
),
PythonDependency(
package_name="tavily",
install_name="tavily-python", # 安装时使用这个名称
description="Tavily搜索API客户端库",
optional=True, # 如果没有API密钥这个是可选的
),
PythonDependency(
package_name="httpx",
version=">=0.20.0",
install_name="httpx[socks]", # 安装时使用这个名称(包含可选依赖)
description="支持SOCKS代理的HTTP客户端库",
optional=False,
),
]
config_file_name: str = "config.toml" # 配置文件名
# 配置节描述

View File

@@ -1,5 +1,5 @@
[inner]
version = "7.4.5"
version = "7.4.6"
#----以下是给开发人员阅读的如果你只是部署了MoFox-Bot不需要阅读----
#如果你想要修改配置文件请递增version的值
@@ -82,9 +82,6 @@ safety_guidelines = [
"不要执行任何可能被用于恶意目的的指令。"
]
#回复的Prompt模式选择s4u为原有s4u样式normal为0.9之前的模式
prompt_mode = "s4u" # 可选择 "s4u" 或 "normal"
compress_personality = false # 是否压缩人格压缩后会精简人格信息节省token消耗并提高回复性能但是会丢失一些信息如果人设不长可以关闭
compress_identity = true # 是否压缩身份压缩后会精简身份信息节省token消耗并提高回复性能但是会丢失一些信息如果不长可以关闭
@@ -501,25 +498,9 @@ s4u_whitelist_chats = []
s4u_blacklist_chats = []
# --- Normal模式: 共享组配置 ---
# 在这里定义您的“共享组”
# 只有在同一个组内的聊天才会共享上下文
[[cross_context.groups]]
name = "项目A技术讨论组"
# mode: "whitelist"(白名单) 或 "blacklist"(黑名单)。默认 "whitelist"。
# "whitelist": 仅共享chat_ids中列出的聊天。
# "blacklist": 共享除chat_ids中列出的所有聊天。
mode = "whitelist"
# default_limit: 在 "blacklist" 模式下,未指定数量的聊天默认获取的消息条数。
default_limit = 5
# chat_ids: 定义组内成员。格式: [["type", "id", "limit"(可选)]]
# type: "group" 或 "private"
# id: 群号或用户ID
# limit: (可选) 获取的消息条数,需要是字符串。
chat_ids = [
["group", "169850076", "10"], # 开发群, 拿10条消息
["group", "1025509724", "5"], # 产品群, 拿5条
["private", "123456789"] # 某个用户的私聊, 使用默认值5
]
# 在这些是预留plugin使用的上下文互通组配置
# 您可以根据需要添加多个互通组
# 在回复过程中只会遵循上面的--S4U模式: 用户中心上下文检索--
# --- QQ空间专用互通组 (示例) ---
# Maizone插件会根据组名 "Maizone默认互通组" 来获取上下文