This commit is contained in:
Windpicker-owo
2025-10-01 04:56:41 +08:00
26 changed files with 980 additions and 1502 deletions

View File

@@ -1,190 +0,0 @@
import re
from typing import List, Tuple, Type
from src.plugin_system import (
BasePlugin,
register_plugin,
BaseAction,
ComponentInfo,
ActionActivationType,
ConfigField,
)
from src.common.logger import get_logger
from .qq_emoji_list import qq_face
from src.plugin_system.base.component_types import ChatType
logger = get_logger("set_emoji_like_plugin")
def get_emoji_id(emoji_input: str) -> str | None:
"""根据输入获取表情ID"""
# 如果输入本身就是数字ID直接返回
if emoji_input.isdigit() or (isinstance(emoji_input, str) and emoji_input.startswith("😊")):
if emoji_input in qq_face:
return emoji_input
# 尝试从 "[表情xxx]" 格式中提取
match = re.search(r"\[表情:(.+?)\]", emoji_input)
if match:
emoji_name = match.group(1).strip()
else:
emoji_name = emoji_input.strip()
# 遍历查找
for key, value in qq_face.items():
# value 的格式是 "[表情xxx]"
if f"[表情:{emoji_name}]" == value:
return key
return None
# ===== Action组件 =====
class SetEmojiLikeAction(BaseAction):
"""设置消息表情回应"""
# === 基本信息(必须填写)===
action_name = "set_emoji_like"
action_description = "为某条已经存在的消息添加‘贴表情’回应(类似点赞),而不是发送新消息。可以在觉得某条消息非常有趣、值得赞同或者需要特殊情感回应时主动使用。"
activation_type = ActionActivationType.ALWAYS # 消息接收时激活(?)
chat_type_allow = ChatType.GROUP
parallel_action = True
# === 功能描述(必须填写)===
# 从 qq_face 字典中提取所有表情名称用于提示
emoji_options = []
for name in qq_face.values():
match = re.search(r"\[表情:(.+?)\]", name)
if match:
emoji_options.append(match.group(1))
action_parameters = {
"emoji": f"要回应的表情,必须从以下表情中选择: {', '.join(emoji_options)}",
"set": "是否设置回应 (True/False)",
}
action_require = [
"当需要对一个已存在消息进行‘贴表情’回应时使用",
"这是一个对旧消息的操作,而不是发送新消息",
"如果你想发送一个新的表情包消息,请使用 'emoji' 动作",
]
llm_judge_prompt = """
判定是否需要使用贴表情动作的条件:
1. 用户明确要求使用贴表情包
2. 这是一个适合表达强烈情绪的场合
3. 不要发送太多表情包,如果你已经发送过多个表情包则回答""
请回答""""
"""
associated_types = ["text"]
async def execute(self) -> Tuple[bool, str]:
"""执行设置表情回应的动作"""
message_id = None
if self.has_action_message:
logger.debug(str(self.action_message))
if isinstance(self.action_message, dict):
message_id = self.action_message.get("message_id")
logger.info(f"获取到的消息ID: {message_id}")
else:
logger.error("未提供消息ID")
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了set_emoji_like动作{self.action_name},失败: 未提供消息ID",
action_done=False,
)
return False, "未提供消息ID"
emoji_input = self.action_data.get("emoji")
set_like = self.action_data.get("set", True)
if not emoji_input:
logger.error("未提供表情")
return False, "未提供表情"
logger.info(f"设置表情回应: {emoji_input}, 是否设置: {set_like}")
emoji_id = get_emoji_id(emoji_input)
if not emoji_id:
logger.error(f"找不到表情: '{emoji_input}'。请从可用列表中选择。")
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了set_emoji_like动作{self.action_name},失败: 找不到表情: '{emoji_input}'",
action_done=False,
)
return False, f"找不到表情: '{emoji_input}'。请从可用列表中选择。"
# 4. 使用适配器API发送命令
if not message_id:
logger.error("未提供消息ID")
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了set_emoji_like动作{self.action_name},失败: 未提供消息ID",
action_done=False,
)
return False, "未提供消息ID"
try:
# 使用适配器API发送贴表情命令
success = await self.send_command(
command_name="set_emoji_like", args={"message_id": message_id, "emoji_id": emoji_id, "set": set_like}, storage_message=False
)
if success:
logger.info("设置表情回应成功")
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了set_emoji_like动作,{emoji_input},设置表情回应: {emoji_id}, 是否设置: {set_like}",
action_done=True,
)
return True, "成功设置表情回应"
else:
logger.error("设置表情回应失败")
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了set_emoji_like动作{self.action_name},失败",
action_done=False,
)
return False, "设置表情回应失败"
except Exception as e:
logger.error(f"设置表情回应失败: {e}")
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了set_emoji_like动作{self.action_name},失败: {e}",
action_done=False,
)
return False, f"设置表情回应失败: {e}"
# ===== 插件注册 =====
@register_plugin
class SetEmojiLikePlugin(BasePlugin):
"""设置消息表情回应插件"""
# 插件基本信息
plugin_name: str = "set_emoji_like" # 内部标识符
enable_plugin: bool = True
dependencies: List[str] = [] # 插件依赖列表
python_dependencies: List[str] = [] # Python包依赖列表现在使用内置API
config_file_name: str = "config.toml" # 配置文件名
# 配置节描述
config_section_descriptions = {"plugin": "插件基本信息", "components": "插件组件"}
# 配置Schema定义
config_schema: dict = {
"plugin": {
"name": ConfigField(type=str, default="set_emoji_like", description="插件名称"),
"version": ConfigField(type=str, default="1.0.0", description="插件版本"),
"enabled": ConfigField(type=bool, default=True, description="是否启用插件"),
"config_version": ConfigField(type=str, default="1.1", description="配置版本"),
},
"components": {
"action_set_emoji_like": ConfigField(type=bool, default=True, description="是否启用设置表情回应功能"),
},
}
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
if self.get_config("components.action_set_emoji_like"):
return [
(SetEmojiLikeAction.get_action_info(), SetEmojiLikeAction),
]
return []

View File

@@ -208,6 +208,7 @@ MoFox_Bot(第三方修改版)
from src.plugin_system.apis.permission_api import permission_api from src.plugin_system.apis.permission_api import permission_api
permission_manager = PermissionManager() permission_manager = PermissionManager()
await permission_manager.initialize()
permission_api.set_permission_manager(permission_manager) permission_api.set_permission_manager(permission_manager)
logger.info("权限管理器初始化成功") logger.info("权限管理器初始化成功")
@@ -318,7 +319,6 @@ MoFox_Bot(第三方修改版)
] ]
# 增强记忆系统不需要定时任务,已禁用原有记忆系统的定时任务 # 增强记忆系统不需要定时任务,已禁用原有记忆系统的定时任务
logger.info("原有记忆系统定时任务已禁用 - 使用增强记忆系统")
await asyncio.gather(*tasks) await asyncio.gather(*tasks)

View File

@@ -2,7 +2,7 @@ import time
import asyncio import asyncio
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Tuple, Optional from typing import Tuple, Optional, List, Dict, Any
from src.common.logger import get_logger from src.common.logger import get_logger
from src.chat.message_receive.chat_stream import ChatStream from src.chat.message_receive.chat_stream import ChatStream
@@ -27,8 +27,21 @@ class BaseAction(ABC):
- parallel_action: 是否允许并行执行 - parallel_action: 是否允许并行执行
- random_activation_probability: 随机激活概率 - random_activation_probability: 随机激活概率
- llm_judge_prompt: LLM判断提示词 - llm_judge_prompt: LLM判断提示词
二步Action相关属性
- is_two_step_action: 是否为二步Action
- step_one_description: 第一步的描述
- sub_actions: 子Action列表
""" """
# 二步Action相关类属性
is_two_step_action: bool = False
"""是否为二步Action。如果为TrueAction将分两步执行第一步选择操作第二步执行具体操作"""
step_one_description: str = ""
"""第一步的描述用于向LLM展示Action的基本功能"""
sub_actions: List[Tuple[str, str, Dict[str, str]]] = []
"""子Action列表格式为[(子Action名, 子Action描述, 子Action参数)]。仅在二步Action中使用"""
def __init__( def __init__(
self, self,
action_data: dict, action_data: dict,
@@ -93,6 +106,13 @@ class BaseAction(ABC):
self.associated_types: list[str] = getattr(self.__class__, "associated_types", []).copy() self.associated_types: list[str] = getattr(self.__class__, "associated_types", []).copy()
self.chat_type_allow: ChatType = getattr(self.__class__, "chat_type_allow", ChatType.ALL) self.chat_type_allow: ChatType = getattr(self.__class__, "chat_type_allow", ChatType.ALL)
# 二步Action相关实例属性
self.is_two_step_action: bool = getattr(self.__class__, "is_two_step_action", False)
self.step_one_description: str = getattr(self.__class__, "step_one_description", "")
self.sub_actions: List[Tuple[str, str, Dict[str, str]]] = getattr(self.__class__, "sub_actions", []).copy()
self._selected_sub_action: Optional[str] = None
"""当前选择的子Action名称用于二步Action的状态管理"""
# ============================================================================= # =============================================================================
# 便捷属性 - 直接在初始化时获取常用聊天信息(带类型注解) # 便捷属性 - 直接在初始化时获取常用聊天信息(带类型注解)
# ============================================================================= # =============================================================================
@@ -412,23 +432,32 @@ class BaseAction(ABC):
logger.warning(f"{log_prefix} 未找到Action组件信息: {action_name}") logger.warning(f"{log_prefix} 未找到Action组件信息: {action_name}")
return False, f"未找到Action组件信息: {action_name}" return False, f"未找到Action组件信息: {action_name}"
plugin_config = component_registry.get_plugin_config(component_info.plugin_name) # 确保获取的是Action组件
if component_info.component_type != ComponentType.ACTION:
logger.error(f"{log_prefix} 尝试调用的组件 '{action_name}' 不是一个Action而是一个 '{component_info.component_type.value}'")
return False, f"组件 '{action_name}' 不是一个有效的Action"
plugin_config = component_registry.get_plugin_config(component_info.plugin_name)
# 3. 实例化被调用的Action # 3. 实例化被调用的Action
action_instance = action_class( action_params = {
action_data=called_action_data, "action_data": called_action_data,
reasoning=f"Called by {self.action_name}", "reasoning": f"Called by {self.action_name}",
cycle_timers=self.cycle_timers, "cycle_timers": self.cycle_timers,
thinking_id=self.thinking_id, "thinking_id": self.thinking_id,
chat_stream=self.chat_stream, "chat_stream": self.chat_stream,
log_prefix=log_prefix, "log_prefix": log_prefix,
plugin_config=plugin_config, "plugin_config": plugin_config,
action_message=self.action_message, "action_message": self.action_message,
) }
action_instance = action_class(**action_params)
# 4. 执行Action # 4. 执行Action
logger.debug(f"{log_prefix} 开始执行...") logger.debug(f"{log_prefix} 开始执行...")
result = await action_instance.execute() execute_result = await action_instance.execute()
# 确保返回类型符合 (bool, str) 格式
is_success = execute_result[0] if isinstance(execute_result, tuple) and len(execute_result) > 0 else False
message = execute_result[1] if isinstance(execute_result, tuple) and len(execute_result) > 1 else ""
result = (is_success, str(message))
logger.info(f"{log_prefix} 执行完成,结果: {result}") logger.info(f"{log_prefix} 执行完成,结果: {result}")
return result return result
@@ -477,15 +506,73 @@ class BaseAction(ABC):
action_require=getattr(cls, "action_require", []).copy(), action_require=getattr(cls, "action_require", []).copy(),
associated_types=getattr(cls, "associated_types", []).copy(), associated_types=getattr(cls, "associated_types", []).copy(),
chat_type_allow=getattr(cls, "chat_type_allow", ChatType.ALL), chat_type_allow=getattr(cls, "chat_type_allow", ChatType.ALL),
# 二步Action相关属性
is_two_step_action=getattr(cls, "is_two_step_action", False),
step_one_description=getattr(cls, "step_one_description", ""),
sub_actions=getattr(cls, "sub_actions", []).copy(),
) )
async def handle_step_one(self) -> Tuple[bool, str]:
"""处理二步Action的第一步
Returns:
Tuple[bool, str]: (是否执行成功, 回复文本)
"""
if not self.is_two_step_action:
return False, "此Action不是二步Action"
# 检查action_data中是否包含选择的子Action
selected_action = self.action_data.get("selected_action")
if not selected_action:
# 第一步展示可用的子Action
available_actions = [sub_action[0] for sub_action in self.sub_actions]
description = self.step_one_description or f"{self.action_name}支持以下操作"
actions_list = "\n".join([f"- {action}: {desc}" for action, desc, _ in self.sub_actions])
response = f"{description}\n\n可用操作:\n{actions_list}\n\n请选择要执行的操作。"
return True, response
else:
# 验证选择的子Action是否有效
valid_actions = [sub_action[0] for sub_action in self.sub_actions]
if selected_action not in valid_actions:
return False, f"无效的操作选择: {selected_action}。可用操作: {valid_actions}"
# 保存选择的子Action
self._selected_sub_action = selected_action
# 调用第二步执行
return await self.execute_step_two(selected_action)
async def execute_step_two(self, sub_action_name: str) -> Tuple[bool, str]:
"""执行二步Action的第二步
Args:
sub_action_name: 子Action名称
Returns:
Tuple[bool, str]: (是否执行成功, 回复文本)
"""
if not self.is_two_step_action:
return False, "此Action不是二步Action"
# 子类需要重写此方法来实现具体的第二步逻辑
return False, f"二步Action必须实现execute_step_two方法来处理操作: {sub_action_name}"
@abstractmethod @abstractmethod
async def execute(self) -> Tuple[bool, str]: async def execute(self) -> Tuple[bool, str]:
"""执行Action的抽象方法子类必须实现 """执行Action的抽象方法子类必须实现
对于二步Action会自动处理第一步逻辑
Returns: Returns:
Tuple[bool, str]: (是否执行成功, 回复文本) Tuple[bool, str]: (是否执行成功, 回复文本)
""" """
# 如果是二步Action自动处理第一步
if self.is_two_step_action:
return await self.handle_step_one()
# 普通Action由子类实现
pass pass
async def handle_action(self) -> Tuple[bool, str]: async def handle_action(self) -> Tuple[bool, str]:

View File

@@ -38,6 +38,14 @@ class BaseTool(ABC):
semantic_cache_query_key: Optional[str] = None semantic_cache_query_key: Optional[str] = None
"""用于语义缓存的查询参数键名。如果设置,将使用此参数的值进行语义相似度搜索""" """用于语义缓存的查询参数键名。如果设置,将使用此参数的值进行语义相似度搜索"""
# 二步工具调用相关属性
is_two_step_tool: bool = False
"""是否为二步工具。如果为True工具将分两步调用第一步展示工具信息第二步执行具体操作"""
step_one_description: str = ""
"""第一步的描述用于向LLM展示工具的基本功能"""
sub_tools: List[Tuple[str, str, List[Tuple[str, ToolParamType, str, bool, List[str] | None]]]] = []
"""子工具列表,格式为[(子工具名, 子工具描述, 子工具参数)]。仅在二步工具中使用"""
def __init__(self, plugin_config: Optional[dict] = None): def __init__(self, plugin_config: Optional[dict] = None):
self.plugin_config = plugin_config or {} # 直接存储插件配置字典 self.plugin_config = plugin_config or {} # 直接存储插件配置字典
@@ -48,10 +56,64 @@ class BaseTool(ABC):
Returns: Returns:
dict: 工具定义字典 dict: 工具定义字典
""" """
if not cls.name or not cls.description or not cls.parameters: if not cls.name or not cls.description:
raise NotImplementedError(f"工具类 {cls.__name__} 必须定义 name, description 和 parameters 属性") raise NotImplementedError(f"工具类 {cls.__name__} 必须定义 name description 属性")
return {"name": cls.name, "description": cls.description, "parameters": cls.parameters} # 如果是二步工具,第一步只返回基本信息
if cls.is_two_step_tool:
return {
"name": cls.name,
"description": cls.step_one_description or cls.description,
"parameters": [("action", ToolParamType.STRING, "选择要执行的操作", True, [sub_tool[0] for sub_tool in cls.sub_tools])]
}
else:
# 普通工具需要parameters
if not cls.parameters:
raise NotImplementedError(f"工具类 {cls.__name__} 必须定义 parameters 属性")
return {"name": cls.name, "description": cls.description, "parameters": cls.parameters}
@classmethod
def get_step_two_tool_definition(cls, sub_tool_name: str) -> dict[str, Any]:
"""获取二步工具的第二步定义
Args:
sub_tool_name: 子工具名称
Returns:
dict: 第二步工具定义字典
"""
if not cls.is_two_step_tool:
raise ValueError(f"工具 {cls.name} 不是二步工具")
# 查找对应的子工具
for sub_name, sub_desc, sub_params in cls.sub_tools:
if sub_name == sub_tool_name:
return {
"name": f"{cls.name}_{sub_tool_name}",
"description": sub_desc,
"parameters": sub_params
}
raise ValueError(f"未找到子工具: {sub_tool_name}")
@classmethod
def get_all_sub_tool_definitions(cls) -> List[dict[str, Any]]:
"""获取所有子工具的定义
Returns:
List[dict]: 所有子工具定义列表
"""
if not cls.is_two_step_tool:
return []
definitions = []
for sub_name, sub_desc, sub_params in cls.sub_tools:
definitions.append({
"name": f"{cls.name}_{sub_name}",
"description": sub_desc,
"parameters": sub_params
})
return definitions
@classmethod @classmethod
def get_tool_info(cls) -> ToolInfo: def get_tool_info(cls) -> ToolInfo:
@@ -79,8 +141,68 @@ class BaseTool(ABC):
Returns: Returns:
dict: 工具执行结果 dict: 工具执行结果
""" """
# 如果是二步工具,处理第一步调用
if self.is_two_step_tool and "action" in function_args:
return await self._handle_step_one(function_args)
raise NotImplementedError("子类必须实现execute方法") raise NotImplementedError("子类必须实现execute方法")
async def _handle_step_one(self, function_args: dict[str, Any]) -> dict[str, Any]:
"""处理二步工具的第一步调用
Args:
function_args: 包含action参数的函数参数
Returns:
dict: 第一步执行结果,包含第二步的工具定义
"""
action = function_args.get("action")
if not action:
return {"error": "缺少action参数"}
# 查找对应的子工具
sub_tool_found = None
for sub_name, sub_desc, sub_params in self.sub_tools:
if sub_name == action:
sub_tool_found = (sub_name, sub_desc, sub_params)
break
if not sub_tool_found:
available_actions = [sub_tool[0] for sub_tool in self.sub_tools]
return {"error": f"未知的操作: {action}。可用操作: {available_actions}"}
sub_name, sub_desc, sub_params = sub_tool_found
# 返回第二步工具定义
step_two_definition = {
"name": f"{self.name}_{sub_name}",
"description": sub_desc,
"parameters": sub_params
}
return {
"type": "two_step_tool_step_one",
"content": f"已选择操作: {action}。请使用以下工具进行具体调用:",
"next_tool_definition": step_two_definition,
"selected_action": action
}
async def execute_step_two(self, sub_tool_name: str, function_args: dict[str, Any]) -> dict[str, Any]:
"""执行二步工具的第二步
Args:
sub_tool_name: 子工具名称
function_args: 工具调用参数
Returns:
dict: 工具执行结果
"""
if not self.is_two_step_tool:
raise ValueError(f"工具 {self.name} 不是二步工具")
# 子类需要重写此方法来实现具体的第二步逻辑
raise NotImplementedError("二步工具必须实现execute_step_two方法")
async def direct_execute(self, **kwargs: dict[str, Any]) -> dict[str, Any]: async def direct_execute(self, **kwargs: dict[str, Any]) -> dict[str, Any]:
"""直接执行工具函数(供插件调用) """直接执行工具函数(供插件调用)
通过该方法,插件可以直接调用工具,而不需要传入字典格式的参数 通过该方法,插件可以直接调用工具,而不需要传入字典格式的参数

View File

@@ -142,6 +142,10 @@ class ActionInfo(ComponentInfo):
mode_enable: ChatMode = ChatMode.ALL mode_enable: ChatMode = ChatMode.ALL
parallel_action: bool = False parallel_action: bool = False
chat_type_allow: ChatType = ChatType.ALL # 允许的聊天类型 chat_type_allow: ChatType = ChatType.ALL # 允许的聊天类型
# 二步Action相关属性
is_two_step_action: bool = False # 是否为二步Action
step_one_description: str = "" # 第一步的描述
sub_actions: List[Tuple[str, str, Dict[str, str]]] = field(default_factory=list) # 子Action列表
def __post_init__(self): def __post_init__(self):
super().__post_init__() super().__post_init__()
@@ -153,6 +157,8 @@ class ActionInfo(ComponentInfo):
self.action_require = [] self.action_require = []
if self.associated_types is None: if self.associated_types is None:
self.associated_types = [] self.associated_types = []
if self.sub_actions is None:
self.sub_actions = []
self.component_type = ComponentType.ACTION self.component_type = ComponentType.ACTION

View File

@@ -55,6 +55,10 @@ class ToolExecutor:
self.llm_model = LLMRequest(model_set=model_config.model_task_config.tool_use, request_type="tool_executor") self.llm_model = LLMRequest(model_set=model_config.model_task_config.tool_use, request_type="tool_executor")
# 二步工具调用状态管理
self._pending_step_two_tools: Dict[str, Dict[str, Any]] = {}
"""待处理的第二步工具调用,格式为 {tool_name: step_two_definition}"""
logger.info(f"{self.log_prefix}工具执行器初始化完成") logger.info(f"{self.log_prefix}工具执行器初始化完成")
async def execute_from_chat_message( async def execute_from_chat_message(
@@ -112,7 +116,18 @@ class ToolExecutor:
def _get_tool_definitions(self) -> List[Dict[str, Any]]: def _get_tool_definitions(self) -> List[Dict[str, Any]]:
all_tools = get_llm_available_tool_definitions() all_tools = get_llm_available_tool_definitions()
user_disabled_tools = global_announcement_manager.get_disabled_chat_tools(self.chat_id) user_disabled_tools = global_announcement_manager.get_disabled_chat_tools(self.chat_id)
return [definition for name, definition in all_tools if name not in user_disabled_tools]
# 获取基础工具定义(包括二步工具的第一步)
tool_definitions = [definition for name, definition in all_tools if name not in user_disabled_tools]
# 检查是否有待处理的二步工具第二步调用
pending_step_two = getattr(self, '_pending_step_two_tools', {})
if pending_step_two:
# 添加第二步工具定义
for tool_name, step_two_def in pending_step_two.items():
tool_definitions.append(step_two_def)
return tool_definitions
async def execute_tool_calls(self, tool_calls: Optional[List[ToolCall]]) -> Tuple[List[Dict[str, Any]], List[str]]: async def execute_tool_calls(self, tool_calls: Optional[List[ToolCall]]) -> Tuple[List[Dict[str, Any]], List[str]]:
"""执行工具调用 """执行工具调用
@@ -251,6 +266,32 @@ class ToolExecutor:
f"{self.log_prefix} 正在执行工具: [bold green]{function_name}[/bold green] | 参数: {function_args}" f"{self.log_prefix} 正在执行工具: [bold green]{function_name}[/bold green] | 参数: {function_args}"
) )
function_args["llm_called"] = True # 标记为LLM调用 function_args["llm_called"] = True # 标记为LLM调用
# 检查是否是二步工具的第二步调用
if "_" in function_name and function_name.count("_") >= 1:
# 可能是二步工具的第二步调用,格式为 "tool_name_sub_tool_name"
parts = function_name.split("_", 1)
if len(parts) == 2:
base_tool_name, sub_tool_name = parts
base_tool_instance = get_tool_instance(base_tool_name)
if base_tool_instance and base_tool_instance.is_two_step_tool:
logger.info(f"{self.log_prefix}执行二步工具第二步: {base_tool_name}.{sub_tool_name}")
result = await base_tool_instance.execute_step_two(sub_tool_name, function_args)
# 清理待处理的第二步工具
self._pending_step_two_tools.pop(base_tool_name, None)
if result:
logger.debug(f"{self.log_prefix}二步工具第二步 {function_name} 执行成功")
return {
"tool_call_id": tool_call.call_id,
"role": "tool",
"name": function_name,
"type": "function",
"content": result.get("content", ""),
}
# 获取对应工具实例 # 获取对应工具实例
tool_instance = tool_instance or get_tool_instance(function_name) tool_instance = tool_instance or get_tool_instance(function_name)
if not tool_instance: if not tool_instance:
@@ -260,6 +301,16 @@ class ToolExecutor:
# 执行工具并记录日志 # 执行工具并记录日志
logger.debug(f"{self.log_prefix}执行工具 {function_name},参数: {function_args}") logger.debug(f"{self.log_prefix}执行工具 {function_name},参数: {function_args}")
result = await tool_instance.execute(function_args) result = await tool_instance.execute(function_args)
# 检查是否是二步工具的第一步结果
if result and result.get("type") == "two_step_tool_step_one":
logger.info(f"{self.log_prefix}二步工具第一步完成: {function_name}")
# 保存第二步工具定义
next_tool_def = result.get("next_tool_definition")
if next_tool_def:
self._pending_step_two_tools[function_name] = next_tool_def
logger.debug(f"{self.log_prefix}已保存第二步工具定义: {next_tool_def['name']}")
if result: if result:
logger.debug(f"{self.log_prefix}工具 {function_name} 执行成功,结果: {result}") logger.debug(f"{self.log_prefix}工具 {function_name} 执行成功,结果: {result}")
return { return {

View File

@@ -644,7 +644,7 @@ class ChatterPlanFilter:
# 为参数描述添加一个通用示例值 # 为参数描述添加一个通用示例值
if action_name == "set_emoji_like" and p_name == "emoji": if action_name == "set_emoji_like" and p_name == "emoji":
# 特殊处理set_emoji_like的emoji参数 # 特殊处理set_emoji_like的emoji参数
from plugins.set_emoji_like.qq_emoji_list import qq_face from src.plugins.built_in.social_toolkit_plugin.qq_emoji_list import qq_face
emoji_options = [re.search(r"\[表情:(.+?)\]", name).group(1) for name in qq_face.values() if re.search(r"\[表情:(.+?)\]", name)] emoji_options = [re.search(r"\[表情:(.+?)\]", name).group(1) for name in qq_face.values() if re.search(r"\[表情:(.+?)\]", name)]
example_value = f"<从'{', '.join(emoji_options[:10])}...'中选择一个>" example_value = f"<从'{', '.join(emoji_options[:10])}...'中选择一个>"
else: else:

View File

@@ -1,11 +0,0 @@
{
"manifest_version": 1,
"name": "At User Plugin",
"description": "一个通过名字艾特用户的插件",
"author": "Kilo Code",
"version": "1.0.0",
"requirements": [],
"license": "MIT",
"keywords": ["at", "mention", "user"],
"categories": ["Chat", "Utility"]
}

View File

@@ -1,173 +0,0 @@
from typing import List, Tuple, Type
from src.plugin_system import (
BasePlugin,
BaseCommand,
CommandInfo,
register_plugin,
BaseAction,
ActionInfo,
ActionActivationType,
)
from src.person_info.person_info import get_person_info_manager
from src.common.logger import get_logger
from src.plugin_system.base.component_types import ChatType
logger = get_logger(__name__)
class AtAction(BaseAction):
"""发送艾特消息"""
# === 基本信息(必须填写)===
action_name = "at_user"
action_description = "发送艾特消息"
activation_type = ActionActivationType.LLM_JUDGE # 消息接收时激活(?)
parallel_action = False
chat_type_allow = ChatType.GROUP
# === 功能描述(必须填写)===
action_parameters = {"user_name": "需要艾特用户的名字", "at_message": "艾特用户时要发送的消息"}
action_require = [
"当用户明确要求你去'''''提醒''艾特'某人时使用",
"当你判断,为了让特定的人看到消息,需要代表用户去呼叫他/她时使用",
"例如:'你去叫一下张三''提醒一下李四开会'",
]
llm_judge_prompt = """
判定是否需要使用艾特用户动作的条件:
1. 你在对话中提到了某个具体的人,并且需要提醒他/她。
3. 上下文明确需要你艾特一个或多个人。
请回答""""
"""
associated_types = ["text"]
async def execute(self) -> Tuple[bool, str]:
"""执行艾特用户的动作"""
user_name = self.action_data.get("user_name")
at_message = self.action_data.get("at_message")
if not user_name or not at_message:
logger.warning("艾特用户的动作缺少必要参数。")
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了艾特用户动作:艾特用户 {user_name} 并发送消息: {at_message},失败了,因为没有提供必要参数",
action_done=False,
)
return False, "缺少必要参数"
user_info = await get_person_info_manager().get_person_info_by_name(user_name)
if not user_info or not user_info.get("user_id"):
logger.info(f"找不到名为 '{user_name}' 的用户。")
return False, "用户不存在"
try:
# 使用回复器生成艾特回复,而不是直接发送命令
from src.chat.replyer.default_generator import DefaultReplyer
from src.chat.message_receive.chat_stream import get_chat_manager
# 获取当前聊天流
chat_manager = get_chat_manager()
chat_stream = self.chat_stream or chat_manager.get_stream(self.chat_id)
if not chat_stream:
logger.error(f"找不到聊天流: {self.chat_stream}")
return False, "聊天流不存在"
# 创建回复器实例
replyer = DefaultReplyer(chat_stream)
# 构建回复对象,将艾特消息作为回复目标
reply_to = f"{user_name}:{at_message}"
extra_info = f"你需要艾特用户 {user_name} 并回复他们说: {at_message}"
# 使用回复器生成回复
success, llm_response, prompt = await replyer.generate_reply_with_context(
reply_to=reply_to,
extra_info=extra_info,
enable_tool=False, # 艾特回复通常不需要工具调用
from_plugin=False,
)
if success and llm_response:
# 获取生成的回复内容
reply_content = llm_response.get("content", "")
if reply_content:
# 获取用户QQ号发送真正的艾特消息
user_id = user_info.get("user_id")
# 发送真正的艾特命令,使用回复器生成的智能内容
await self.send_command(
"SEND_AT_MESSAGE",
args={"qq_id": user_id, "text": reply_content},
display_message=f"艾特用户 {user_name} 并发送智能回复: {reply_content}",
)
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了艾特用户动作:艾特用户 {user_name} 并发送智能回复: {reply_content}",
action_done=True,
)
logger.info(f"成功通过回复器生成智能内容并发送真正的艾特消息给 {user_name}: {reply_content}")
return True, "智能艾特消息发送成功"
else:
logger.warning("回复器生成了空内容")
return False, "回复内容为空"
else:
logger.error("回复器生成回复失败")
return False, "回复生成失败"
except Exception as e:
logger.error(f"执行艾特用户动作时发生异常: {e}", exc_info=True)
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行艾特用户动作失败:{str(e)}",
action_done=False,
)
return False, f"执行失败: {str(e)}"
class AtCommand(BaseCommand):
command_name: str = "at_user"
description: str = "通过名字艾特用户"
command_pattern: str = r"/at\s+@?(?P<name>[\S]+)(?:\s+(?P<text>.*))?"
async def execute(self) -> Tuple[bool, str, bool]:
name = self.matched_groups.get("name")
text = self.matched_groups.get("text", "")
if not name:
await self.send_text("请指定要艾特的用户名称。")
return False, "缺少用户名称", True
person_info_manager = get_person_info_manager()
user_info = await person_info_manager.get_person_info_by_name(name)
if not user_info or not user_info.get("user_id"):
await self.send_text(f"找不到名为 '{name}' 的用户。")
return False, "用户不存在", True
user_id = user_info.get("user_id")
await self.send_command(
"SEND_AT_MESSAGE",
args={"qq_id": user_id, "text": text},
display_message=f"艾特用户 {name} 并发送消息: {text}",
)
return True, "艾特消息已发送", True
@register_plugin
class AtUserPlugin(BasePlugin):
plugin_name: str = "at_user_plugin"
enable_plugin: bool = True
dependencies: list[str] = []
python_dependencies: list[str] = []
config_file_name: str = "config.toml"
config_schema: dict = {}
def get_plugin_components(self) -> List[Tuple[CommandInfo | ActionInfo, Type[BaseCommand] | Type[BaseAction]]]:
return [
(AtAction.get_action_info(), AtAction),
]

View File

@@ -88,25 +88,27 @@ class MaiZoneRefactoredPlugin(BasePlugin):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
async def on_plugin_loaded(self): async def on_plugin_loaded(self):
"""插件加载完成后的回调,初始化服务并启动后台任务"""
# --- 注册权限节点 ---
await permission_api.register_permission_node( await permission_api.register_permission_node(
"plugin.maizone.send_feed", "是否可以使用机器人发送QQ空间说说", "maiZone", False "plugin.maizone.send_feed", "是否可以使用机器人发送QQ空间说说", "maiZone", False
) )
await permission_api.register_permission_node( await permission_api.register_permission_node(
"plugin.maizone.read_feed", "是否可以使用机器人读取QQ空间说说", "maiZone", True "plugin.maizone.read_feed", "是否可以使用机器人读取QQ空间说说", "maiZone", True
) )
# 创建所有服务实例
# --- 创建并注册所有服务实例 ---
content_service = ContentService(self.get_config) content_service = ContentService(self.get_config)
image_service = ImageService(self.get_config) image_service = ImageService(self.get_config)
cookie_service = CookieService(self.get_config) cookie_service = CookieService(self.get_config)
reply_tracker_service = ReplyTrackerService() reply_tracker_service = ReplyTrackerService()
# 使用已创建的 reply_tracker_service 实例
qzone_service = QZoneService( qzone_service = QZoneService(
self.get_config, self.get_config,
content_service, content_service,
image_service, image_service,
cookie_service, cookie_service,
reply_tracker_service, # 传入已创建的实例 reply_tracker_service,
) )
scheduler_service = SchedulerService(self.get_config, qzone_service) scheduler_service = SchedulerService(self.get_config, qzone_service)
monitor_service = MonitorService(self.get_config, qzone_service) monitor_service = MonitorService(self.get_config, qzone_service)
@@ -115,18 +117,12 @@ class MaiZoneRefactoredPlugin(BasePlugin):
register_service("reply_tracker", reply_tracker_service) register_service("reply_tracker", reply_tracker_service)
register_service("get_config", self.get_config) register_service("get_config", self.get_config)
# 保存服务引用以便后续启动 logger.info("MaiZone重构版插件服务已注册。")
self.scheduler_service = scheduler_service
self.monitor_service = monitor_service
logger.info("MaiZone重构版插件已加载服务已注册。") # --- 启动后台任务 ---
asyncio.create_task(scheduler_service.start())
async def on_plugin_loaded(self): asyncio.create_task(monitor_service.start())
"""插件加载完成后的回调,启动异步服务""" logger.info("MaiZone后台监控和定时任务已启动。")
if hasattr(self, "scheduler_service") and hasattr(self, "monitor_service"):
asyncio.create_task(self.scheduler_service.start())
asyncio.create_task(self.monitor_service.start())
logger.info("MaiZone后台任务已启动。")
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
return [ return [

View File

@@ -113,31 +113,32 @@ class CookieService:
async def get_cookies(self, qq_account: str, stream_id: Optional[str]) -> Optional[Dict[str, str]]: async def get_cookies(self, qq_account: str, stream_id: Optional[str]) -> Optional[Dict[str, str]]:
""" """
获取Cookie按以下顺序尝试 获取Cookie按以下顺序尝试
1. Adapter API 1. HTTP备用端点 (更稳定)
2. HTTP备用端点 2. 本地文件缓存
3. 本地文件缓存 3. Adapter API (作为最后手段)
""" """
# 1. 尝试从Adapter获取 # 1. 尝试从HTTP备用端点获取
cookies = await self._get_cookies_from_adapter(stream_id) logger.info(f"开始尝试从HTTP备用地址获取 {qq_account} 的Cookie...")
if cookies:
logger.info("成功从Adapter获取Cookie。")
self._save_cookies_to_file(qq_account, cookies)
return cookies
# 2. 尝试从HTTP备用端点获取
logger.warning("从Adapter获取Cookie失败尝试使用HTTP备用地址。")
cookies = await self._get_cookies_from_http() cookies = await self._get_cookies_from_http()
if cookies: if cookies:
logger.info("成功从HTTP备用地址获取Cookie。") logger.info(f"成功从HTTP备用地址{qq_account} 获取Cookie。")
self._save_cookies_to_file(qq_account, cookies) self._save_cookies_to_file(qq_account, cookies)
return cookies return cookies
# 3. 尝试从本地文件加载 # 2. 尝试从本地文件加载
logger.warning("从HTTP备用地址获取Cookie失败尝试加载本地缓存。") logger.warning(f"从HTTP备用地址获取 {qq_account}Cookie失败尝试加载本地缓存。")
cookies = self._load_cookies_from_file(qq_account) cookies = self._load_cookies_from_file(qq_account)
if cookies: if cookies:
logger.info("成功从本地文件加载缓存的Cookie。") logger.info(f"成功从本地文件{qq_account} 加载缓存的Cookie。")
return cookies return cookies
logger.error("所有Cookie获取方法均失败。") # 3. 尝试从Adapter获取 (作为最后的备用方案)
logger.warning(f"从本地缓存加载 {qq_account} 的Cookie失败最后尝试使用Adapter API。")
cookies = await self._get_cookies_from_adapter(stream_id)
if cookies:
logger.info(f"成功从Adapter API为 {qq_account} 获取Cookie。")
self._save_cookies_to_file(qq_account, cookies)
return cookies
logger.error(f"{qq_account} 获取Cookie的所有方法均失败。请确保Napcat HTTP服务或Adapter连接至少有一个正常工作或存在有效的本地Cookie文件。")
return None return None

View File

@@ -409,8 +409,9 @@ class QZoneService:
cookie_dir.mkdir(exist_ok=True) cookie_dir.mkdir(exist_ok=True)
cookie_file_path = cookie_dir / f"cookies-{qq_account}.json" cookie_file_path = cookie_dir / f"cookies-{qq_account}.json"
# 优先尝试通过Napcat HTTP服务获取最新的Cookie
try: try:
# 使用HTTP服务器方式获取Cookie logger.info("尝试通过Napcat HTTP服务获取Cookie...")
host = self.get_config("cookie.http_fallback_host", "172.20.130.55") host = self.get_config("cookie.http_fallback_host", "172.20.130.55")
port = self.get_config("cookie.http_fallback_port", "9999") port = self.get_config("cookie.http_fallback_port", "9999")
napcat_token = self.get_config("cookie.napcat_token", "") napcat_token = self.get_config("cookie.napcat_token", "")
@@ -421,23 +422,43 @@ class QZoneService:
parsed_cookies = { parsed_cookies = {
k.strip(): v.strip() for k, v in (p.split("=", 1) for p in cookie_str.split("; ") if "=" in p) k.strip(): v.strip() for k, v in (p.split("=", 1) for p in cookie_str.split("; ") if "=" in p)
} }
with open(cookie_file_path, "wb") as f: # 成功获取后,异步写入本地文件作为备份
f.write(orjson.dumps(parsed_cookies)) try:
logger.info(f"Cookie已更新并保存至: {cookie_file_path}") with open(cookie_file_path, "wb") as f:
f.write(orjson.dumps(parsed_cookies))
logger.info(f"通过Napcat服务成功更新Cookie并已保存至: {cookie_file_path}")
except Exception as e:
logger.warning(f"保存Cookie到文件时出错: {e}")
return parsed_cookies return parsed_cookies
else:
logger.warning("通过Napcat服务未能获取有效Cookie。")
# 如果HTTP获取失败尝试读取本地文件
if cookie_file_path.exists():
with open(cookie_file_path, "rb") as f:
return orjson.loads(f.read())
return None
except Exception as e: except Exception as e:
logger.error(f"更新或加载Cookie时发生异常: {e}") logger.warning(f"通过Napcat HTTP服务获取Cookie时发生异常: {e}。将尝试从本地文件加载。")
return None
async def _fetch_cookies_http(self, host: str, port: str, napcat_token: str) -> Optional[Dict]: # 如果通过服务获取失败,则尝试从本地文件加载
logger.info("尝试从本地Cookie文件加载...")
if cookie_file_path.exists():
try:
with open(cookie_file_path, "rb") as f:
cookies = orjson.loads(f.read())
logger.info(f"成功从本地文件加载Cookie: {cookie_file_path}")
return cookies
except Exception as e:
logger.error(f"从本地文件 {cookie_file_path} 读取或解析Cookie失败: {e}")
else:
logger.warning(f"本地Cookie文件不存在: {cookie_file_path}")
logger.error("所有获取Cookie的方式均失败。")
return None
async def _fetch_cookies_http(self, host: str, port: int, napcat_token: str) -> Optional[Dict]:
"""通过HTTP服务器获取Cookie""" """通过HTTP服务器获取Cookie"""
url = f"http://{host}:{port}/get_cookies" # 从配置中读取主机和端口,如果未提供则使用传入的参数
final_host = self.get_config("cookie.http_fallback_host", host)
final_port = self.get_config("cookie.http_fallback_port", port)
url = f"http://{final_host}:{final_port}/get_cookies"
max_retries = 5 max_retries = 5
retry_delay = 1 retry_delay = 1
@@ -481,14 +502,19 @@ class QZoneService:
async def _get_api_client(self, qq_account: str, stream_id: Optional[str]) -> Optional[Dict]: async def _get_api_client(self, qq_account: str, stream_id: Optional[str]) -> Optional[Dict]:
cookies = await self.cookie_service.get_cookies(qq_account, stream_id) cookies = await self.cookie_service.get_cookies(qq_account, stream_id)
if not cookies: if not cookies:
logger.error("获取API客户端失败未能获取到Cookie。请检查Napcat连接是否正常或是否存在有效的本地Cookie文件。")
return None return None
p_skey = cookies.get("p_skey") or cookies.get("p_skey".upper()) p_skey = cookies.get("p_skey") or cookies.get("p_skey".upper())
if not p_skey: if not p_skey:
logger.error(f"获取API客户端失败Cookie中缺少关键的 'p_skey'。Cookie内容: {cookies}")
return None return None
gtk = self._generate_gtk(p_skey) gtk = self._generate_gtk(p_skey)
uin = cookies.get("uin", "").lstrip("o") uin = cookies.get("uin", "").lstrip("o")
if not uin:
logger.error(f"获取API客户端失败Cookie中缺少关键的 'uin'。Cookie内容: {cookies}")
return None
async def _request(method, url, params=None, data=None, headers=None): async def _request(method, url, params=None, data=None, headers=None):
final_headers = {"referer": f"https://user.qzone.qq.com/{uin}", "origin": "https://user.qzone.qq.com"} final_headers = {"referer": f"https://user.qzone.qq.com/{uin}", "origin": "https://user.qzone.qq.com"}

View File

@@ -185,9 +185,13 @@ class SendHandler:
logger.info(f"执行适配器命令: {action}") logger.info(f"执行适配器命令: {action}")
# 直接向Napcat发送命令并获取响应 # 根据action决定处理方式
response_task = asyncio.create_task(self.send_message_to_napcat(action, params)) if action == "get_cookies":
response = await response_task # 对于get_cookies我们需要一个更长的超时时间
response = await self.send_message_to_napcat(action, params, timeout=40.0)
else:
# 对于其他命令,使用默认超时
response = await self.send_message_to_napcat(action, params)
# 发送响应回MaiBot # 发送响应回MaiBot
await self.send_adapter_command_response(raw_message_base, response, request_id) await self.send_adapter_command_response(raw_message_base, response, request_id)
@@ -196,6 +200,8 @@ class SendHandler:
logger.info(f"适配器命令 {action} 执行成功") logger.info(f"适配器命令 {action} 执行成功")
else: else:
logger.warning(f"适配器命令 {action} 执行失败napcat返回{str(response)}") logger.warning(f"适配器命令 {action} 执行失败napcat返回{str(response)}")
# 无论成功失败,都记录下完整的响应内容以供调试
logger.debug(f"适配器命令 {action} 的完整响应: {response}")
except Exception as e: except Exception as e:
logger.error(f"处理适配器命令时发生错误: {e}") logger.error(f"处理适配器命令时发生错误: {e}")
@@ -583,7 +589,7 @@ class SendHandler:
}, },
) )
async def send_message_to_napcat(self, action: str, params: dict) -> dict: async def send_message_to_napcat(self, action: str, params: dict, timeout: float = 20.0) -> dict:
request_uuid = str(uuid.uuid4()) request_uuid = str(uuid.uuid4())
payload = json.dumps({"action": action, "params": params, "echo": request_uuid}) payload = json.dumps({"action": action, "params": params, "echo": request_uuid})
@@ -595,9 +601,9 @@ class SendHandler:
try: try:
await connection.send(payload) await connection.send(payload)
response = await get_response(request_uuid) response = await get_response(request_uuid, timeout=timeout) # 使用传入的超时时间
except TimeoutError: except TimeoutError:
logger.error("发送消息超时,未收到响应") logger.error(f"发送消息超时{timeout}秒),未收到响应: action={action}, params={params}")
return {"status": "error", "message": "timeout"} return {"status": "error", "message": "timeout"}
except Exception as e: except Exception as e:
logger.error(f"发送消息失败: {e}") logger.error(f"发送消息失败: {e}")

View File

@@ -1,83 +0,0 @@
{
"manifest_version": 1,
"name": "戳一戳插件 (Poke Plugin)",
"version": "1.0.0",
"description": "智能戳一戳互动插件支持主动戳用户和自动反戳机制提供多种反应模式包括AI回复",
"author": {
"name": "MaiBot-Plus开发团队",
"url": "https://github.com/MaiBot-Plus"
},
"license": "GPL-v3.0-or-later",
"host_application": {
"min_version": "0.8.0"
},
"homepage_url": "https://github.com/MoFox-Studio/MoFox_Bot",
"repository_url": "https://github.com/MoFox-Studio/MoFox_Bot",
"keywords": ["poke", "interaction", "fun", "social", "ai-reply", "auto-response"],
"categories": ["Social", "Interactive", "Fun"],
"default_locale": "zh-CN",
"locales_path": "_locales",
"plugin_info": {
"is_built_in": false,
"plugin_type": "interactive",
"components": [
{
"type": "action",
"name": "poke_user",
"description": "向指定用户发送戳一戳动作",
"parameters": {
"user_name": "需要戳一戳的用户名称",
"times": "戳一戳次数可选默认为1"
},
"activation_modes": ["llm_judge", "manual"],
"llm_conditions": [
"用户明确要求使用戳一戳",
"想以有趣的方式提醒或与某人互动",
"上下文明确需要戳一个或多个人"
]
},
{
"type": "command",
"name": "poke_back",
"description": "检测戳一戳消息并自动反戳",
"pattern": "(?P<poker_name>\\S+)\\s*戳了戳\\s*(?P<target_name>\\S+)",
"features": [
"正则表达式匹配戳一戳文本",
"智能识别戳击目标",
"防抖机制避免频繁反戳",
"多种反戳模式选择"
]
}
],
"features": [
"主动戳一戳功能 - 可指定用户和次数",
"智能反戳机制 - 自动检测并回应戳一戳",
"多种反戳模式 - 戳回去/AI回复/随机选择",
"AI智能回复 - 集成回复生成器API",
"冷却时间控制 - 防止频繁反戳",
"灵活配置选项 - 支持自定义回复消息",
"安全过滤机制 - 只对针对机器人的戳一戳反应"
],
"configuration": {
"poke_back_mode": {
"type": "string",
"options": ["poke", "reply", "random"],
"default": "poke",
"description": "反戳模式选择"
},
"poke_back_cooldown": {
"type": "integer",
"default": 5,
"description": "反戳冷却时间(秒)"
},
"enable_typo_in_reply": {
"type": "boolean",
"default": false,
"description": "AI回复时是否启用错字生成"
}
}
}
}

View File

@@ -1,307 +0,0 @@
import asyncio
import random
from typing import List, Tuple, Type
from src.plugin_system import (
BasePlugin,
register_plugin,
BaseAction,
BaseCommand,
ComponentInfo,
ActionActivationType,
ConfigField,
)
from src.common.logger import get_logger
from src.person_info.person_info import get_person_info_manager
from src.plugin_system.apis import generator_api
logger = get_logger("poke_plugin")
# ===== Action组件 =====
class PokeAction(BaseAction):
"""发送戳一戳动作"""
# === 基本信息(必须填写)===
action_name = "poke_user"
action_description = "向用户发送戳一戳"
activation_type = ActionActivationType.ALWAYS
parallel_action = True
# === 功能描述(必须填写)===
action_parameters = {
"user_name": "需要戳一戳的用户的名字 (可选)",
"user_id": "需要戳一戳的用户的ID (可选,优先级更高)",
"times": "需要戳一戳的次数 (默认为 1)",
}
action_require = ["当需要戳某个用户时使用", "当你想提醒特定用户时使用"]
llm_judge_prompt = """
判定是否需要使用戳一戳动作的条件:
1. 用户明确要求使用戳一戳。
2. 你想以一种有趣的方式提醒或与某人互动。
3. 上下文明确需要你戳一个或多个人。
请回答""""
"""
associated_types = ["text"]
async def execute(self) -> Tuple[bool, str]:
"""执行戳一戳的动作"""
user_id = self.action_data.get("user_id")
user_name = self.action_data.get("user_name")
try:
times = int(self.action_data.get("times", 1))
except (ValueError, TypeError):
times = 1
# 优先使用 user_id
if not user_id:
if not user_name:
logger.warning("戳一戳动作缺少 'user_id''user_name' 参数。")
return False, "缺少用户标识参数"
# 备用方案:通过 user_name 查找
user_info = await get_person_info_manager().get_person_info_by_name(user_name)
if not user_info or not user_info.get("user_id"):
logger.info(f"找不到名为 '{user_name}' 的用户。")
return False, f"找不到名为 '{user_name}' 的用户"
user_id = user_info.get("user_id")
display_name = user_name or user_id
for i in range(times):
logger.info(f"正在向 {display_name} ({user_id}) 发送第 {i + 1}/{times} 次戳一戳...")
await self.send_command(
"SEND_POKE", args={"qq_id": user_id}, display_message=f"戳了戳 {display_name} ({i + 1}/{times})"
)
# 添加一个小的延迟,以避免发送过快
await asyncio.sleep(0.5)
success_message = f"已向 {display_name} 发送 {times} 次戳一戳。"
await self.store_action_info(
action_build_into_prompt=True, action_prompt_display=success_message, action_done=True
)
return True, success_message
# ===== Command组件 =====
class PokeBackCommand(BaseCommand):
"""反戳命令组件"""
command_name = "poke_back"
command_description = "检测到戳一戳时自动反戳回去"
# 匹配戳一戳的正则表达式 - 匹配 "xxx戳了戳xxx" 的格式
command_pattern = r"(?P<poker_name>\S+)\s*戳了戳\s*(?P<target_name>\S+)"
async def execute(self) -> Tuple[bool, str, bool]:
"""执行反戳逻辑"""
# 检查反戳功能是否启用
if not self.get_config("components.command_poke_back", True):
return False, "", False
# 获取匹配的用户名
poker_name = self.matched_groups.get("poker_name", "")
target_name = self.matched_groups.get("target_name", "")
if not poker_name or not target_name:
logger.debug("戳一戳消息格式不匹配,跳过反戳")
return False, "", False
# 只有当目标是机器人自己时才反戳
if target_name not in ["", "bot", "机器人", "麦麦"]:
logger.debug(f"戳一戳目标不是机器人 ({target_name}), 跳过反戳")
return False, "", False
# 获取戳我的用户信息
poker_info = await get_person_info_manager().get_person_info_by_name(poker_name)
if not poker_info or not poker_info.get("user_id"):
logger.info(f"找不到名为 '{poker_name}' 的用户信息,无法反戳")
return False, "", False
poker_id = poker_info.get("user_id")
if not isinstance(poker_id, (int, str)):
logger.error(f"获取到的用户ID类型不正确: {type(poker_id)}")
return False, "", False
# 确保poker_id是整数类型
try:
poker_id = int(poker_id)
except (ValueError, TypeError):
logger.error(f"无法将用户ID转换为整数: {poker_id}")
return False, "", False
# 检查反戳冷却时间(防止频繁反戳)
cooldown_seconds = self.get_config("components.poke_back_cooldown", 5)
current_time = asyncio.get_event_loop().time()
# 使用类变量存储上次反戳时间
if not hasattr(PokeBackCommand, "_last_poke_back_time"):
PokeBackCommand._last_poke_back_time = {}
last_time = PokeBackCommand._last_poke_back_time.get(poker_id, 0)
if current_time - last_time < cooldown_seconds:
logger.info(f"反戳冷却中,跳过对 {poker_name} 的反戳")
return False, "", False
# 记录本次反戳时间
PokeBackCommand._last_poke_back_time[poker_id] = current_time
# 执行反戳
logger.info(f"检测到 {poker_name} 戳了我,准备反戳回去")
try:
# 获取反戳模式
poke_back_mode = self.get_config("components.poke_back_mode", "poke") # "poke", "reply", "random"
if poke_back_mode == "random":
# 随机选择模式
poke_back_mode = random.choice(["poke", "reply"])
if poke_back_mode == "poke":
# 戳回去模式
await self._poke_back(poker_id, poker_name)
elif poke_back_mode == "reply":
# 回复模式
await self._reply_back(poker_name)
else:
logger.warning(f"未知的反戳模式: {poke_back_mode}")
return False, "", False
logger.info(f"成功反戳了 {poker_name} (模式: {poke_back_mode})")
return True, f"反戳了 {poker_name}", False # 不拦截消息继续处理
except Exception as e:
logger.error(f"反戳失败: {e}")
return False, "", False
async def _poke_back(self, poker_id: int, poker_name: str):
"""执行戳一戳反击"""
await self.send_command(
"SEND_POKE",
args={"qq_id": poker_id},
display_message=f"反戳了 {poker_name}",
storage_message=False, # 不存储到消息历史中
)
# 可选:发送一个随机的反戳回复
poke_back_messages = self.get_config(
"components.poke_back_messages",
[
"哼,戳回去!",
"戳我干嘛~",
"反戳!",
"你戳我,我戳你!",
"(戳回去)",
],
)
if poke_back_messages and self.get_config("components.send_poke_back_message", False):
reply_message = random.choice(poke_back_messages)
await self.send_text(reply_message)
async def _reply_back(self, poker_name: str):
"""生成AI回复"""
# 构造回复上下文
extra_info = f"{poker_name}戳了我一下,需要生成一个有趣的回应。"
# 获取配置,确保类型正确
enable_typo = self.get_config("components.enable_typo_in_reply", False)
if not isinstance(enable_typo, bool):
enable_typo = False
# 使用generator_api生成回复
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.message.chat_stream,
extra_info=extra_info,
enable_tool=False,
enable_splitter=True,
enable_chinese_typo=enable_typo,
from_plugin=True,
)
if success and reply_set:
# 发送生成的回复
for reply_item in reply_set:
message_type, content = reply_item
if message_type == "text":
await self.send_text(content)
else:
await self.send_type(message_type, content)
else:
# 如果AI回复失败发送一个默认回复
fallback_messages = self.get_config(
"components.fallback_reply_messages",
[
"被戳了!",
"诶?",
"做什么呢~",
"怎么了?",
],
)
# 确保fallback_messages是列表
if isinstance(fallback_messages, list) and fallback_messages:
fallback_reply = random.choice(fallback_messages)
await self.send_text(fallback_reply)
else:
await self.send_text("被戳了!")
# ===== 插件注册 =====
@register_plugin
class PokePlugin(BasePlugin):
"""戳一戳插件"""
# 插件基本信息
plugin_name: str = "poke_plugin"
enable_plugin: bool = True
dependencies: List[str] = []
python_dependencies: List[str] = []
config_file_name: str = "config.toml"
# 配置节描述
config_section_descriptions = {"plugin": "插件基本信息", "components": "插件组件"}
# 配置Schema定义
config_schema: dict = {
"plugin": {
"name": ConfigField(type=str, default="poke_plugin", description="插件名称"),
"version": ConfigField(type=str, default="1.0.0", description="插件版本"),
"enabled": ConfigField(type=bool, default=True, description="是否启用插件"),
"config_version": ConfigField(type=str, default="1.0", description="配置版本"),
},
"components": {
"action_poke_user": ConfigField(type=bool, default=True, description="是否启用戳一戳功能"),
"command_poke_back": ConfigField(type=bool, default=True, description="是否启用反戳功能"),
"poke_back_mode": ConfigField(
type=str, default="poke", description="反戳模式: poke(戳回去), reply(AI回复), random(随机)"
),
"poke_back_cooldown": ConfigField(type=int, default=5, description="反戳冷却时间(秒)"),
"send_poke_back_message": ConfigField(type=bool, default=False, description="戳回去时是否发送文字回复"),
"enable_typo_in_reply": ConfigField(type=bool, default=False, description="AI回复时是否启用错字生成"),
"poke_back_messages": ConfigField(
type=list,
default=["哼,戳回去!", "戳我干嘛~", "反戳!", "你戳我,我戳你!", "(戳回去)"],
description="戳回去时的随机回复消息列表",
),
"fallback_reply_messages": ConfigField(
type=list,
default=["被戳了!", "诶?", "做什么呢~", "怎么了?"],
description="AI回复失败时的备用回复消息列表",
),
},
}
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
components = []
# 添加戳一戳动作组件
if self.get_config("components.action_poke_user"):
components.append((PokeAction.get_action_info(), PokeAction))
# 添加反戳命令组件
if self.get_config("components.command_poke_back"):
components.append((PokeBackCommand.get_command_info(), PokeBackCommand))
return components

View File

@@ -1,9 +0,0 @@
{
"manifest_version": 1,
"name": "智能提醒插件",
"version": "1.0.0",
"description": "一个能从对话中智能识别并设置定时提醒的插件。",
"author": {
"name": "墨墨"
}
}

View File

@@ -1,341 +0,0 @@
import asyncio
from datetime import datetime
from typing import List, Tuple, Type, Optional
from dateutil.parser import parse as parse_datetime
from src.common.logger import get_logger
from src.manager.async_task_manager import AsyncTask, async_task_manager
from src.person_info.person_info import get_person_info_manager
from src.plugin_system import (
BaseAction,
ActionInfo,
BasePlugin,
register_plugin,
ActionActivationType,
)
from src.plugin_system.apis import send_api, llm_api, generator_api
from src.plugin_system.base.component_types import ComponentType
logger = get_logger(__name__)
# ============================ AsyncTask ============================
class ReminderTask(AsyncTask):
def __init__(self, delay: float, stream_id: str, group_id: Optional[str], is_group: bool, target_user_id: str, target_user_name: str, event_details: str, creator_name: str, chat_stream: "ChatStream"):
super().__init__(task_name=f"ReminderTask_{target_user_id}_{datetime.now().timestamp()}")
self.delay = delay
self.stream_id = stream_id
self.group_id = group_id
self.is_group = is_group
self.target_user_id = target_user_id
self.target_user_name = target_user_name
self.event_details = event_details
self.creator_name = creator_name
self.chat_stream = chat_stream
async def run(self):
try:
if self.delay > 0:
logger.info(f"等待 {self.delay:.2f} 秒后执行提醒...")
await asyncio.sleep(self.delay)
logger.info(f"执行提醒任务: 给 {self.target_user_name} 发送关于 '{self.event_details}' 的提醒")
extra_info = f"现在是提醒时间,请你以一种符合你人设的、俏皮的方式提醒 {self.target_user_name}\n提醒内容: {self.event_details}\n设置提醒的人: {self.creator_name}"
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.chat_stream,
extra_info=extra_info,
reply_message=self.chat_stream.context.get_last_message().to_dict(),
request_type="plugin.reminder.remind_message"
)
if success and reply_set:
for i, (_, text) in enumerate(reply_set):
if self.is_group:
message_payload = []
if i == 0:
message_payload.append({"type": "at", "data": {"qq": self.target_user_id}})
message_payload.append({"type": "text", "data": {"text": f" {text}"}})
await send_api.adapter_command_to_stream(
action="send_group_msg",
params={"group_id": self.group_id, "message": message_payload},
stream_id=self.stream_id
)
else:
await send_api.text_to_stream(text=text, stream_id=self.stream_id)
else:
# Fallback message
reminder_text = f"叮咚!这是 {self.creator_name} 让我准时提醒你的事情:\n\n{self.event_details}"
if self.is_group:
message_payload = [
{"type": "at", "data": {"qq": self.target_user_id}},
{"type": "text", "data": {"text": f" {reminder_text}"}}
]
await send_api.adapter_command_to_stream(
action="send_group_msg",
params={"group_id": self.group_id, "message": message_payload},
stream_id=self.stream_id
)
else:
await send_api.text_to_stream(text=reminder_text, stream_id=self.stream_id)
logger.info(f"提醒任务 {self.task_name} 成功完成。")
except Exception as e:
logger.error(f"执行提醒任务 {self.task_name} 时出错: {e}", exc_info=True)
# =============================== Actions ===============================
class RemindAction(BaseAction):
"""一个能从对话中智能识别并设置定时提醒的动作。"""
# === 基本信息 ===
action_name = "set_reminder"
action_description = "根据用户的对话内容,智能地设置一个未来的提醒事项。"
@staticmethod
def get_action_info() -> ActionInfo:
return ActionInfo(
name="set_reminder",
component_type=ComponentType.ACTION,
activation_type=ActionActivationType.KEYWORD,
activation_keywords=["提醒", "叫我", "记得", "别忘了"]
)
# === LLM 判断与参数提取 ===
llm_judge_prompt = ""
action_parameters = {}
action_require = [
"当用户请求在未来的某个时间点提醒他/她或别人某件事时使用",
"适用于包含明确时间信息和事件描述的对话",
"例如:'10分钟后提醒我收快递''明天早上九点喊一下李四参加晨会'"
]
async def execute(self) -> Tuple[bool, str]:
"""执行设置提醒的动作"""
try:
# 获取所有可用的模型配置
available_models = llm_api.get_available_models()
if "planner" not in available_models:
raise ValueError("未找到 'planner' 决策模型配置,无法解析时间")
model_to_use = available_models["planner"]
bot_name = self.chat_stream.user_info.user_nickname
prompt = f"""
从以下用户输入中提取提醒事件的关键信息。
用户输入: "{self.chat_stream.context.message.processed_plain_text}"
Bot的名字是: "{bot_name}"
请仔细分析句子结构以确定谁是提醒的真正目标。Bot自身不应被视为被提醒人。
请以JSON格式返回提取的信息包含以下字段:
- "user_name": 需要被提醒的人的姓名。如果未指定,则默认为"自己"
- "remind_time": 描述提醒时间的自然语言字符串。
- "event_details": 需要提醒的具体事件内容。
示例:
- 用户输入: "提醒我十分钟后开会" -> {{"user_name": "自己", "remind_time": "十分钟后", "event_details": "开会"}}
- 用户输入: "{bot_name},提醒一闪一分钟后睡觉" -> {{"user_name": "一闪", "remind_time": "一分钟后", "event_details": "睡觉"}}
如果无法提取完整信息请返回一个包含空字符串的JSON对象例如{{"user_name": "", "remind_time": "", "event_details": ""}}
"""
success, response, _, _ = await llm_api.generate_with_model(
prompt,
model_config=model_to_use,
request_type="plugin.reminder.parameter_extractor"
)
if not success or not response:
raise ValueError(f"LLM未能返回有效的参数: {response}")
import json
import re
try:
# 提取JSON部分
json_match = re.search(r"\{.*\}", response, re.DOTALL)
if not json_match:
raise ValueError("LLM返回的内容中不包含JSON")
action_data = json.loads(json_match.group(0))
except json.JSONDecodeError:
logger.error(f"[ReminderPlugin] LLM返回的不是有效的JSON: {response}")
return False, "LLM返回的不是有效的JSON"
user_name = action_data.get("user_name")
remind_time_str = action_data.get("remind_time")
event_details = action_data.get("event_details")
except Exception as e:
logger.error(f"[ReminderPlugin] 解析参数时出错: {e}", exc_info=True)
return False, "解析参数时出错"
if not all([user_name, remind_time_str, event_details]):
missing_params = [p for p, v in {"user_name": user_name, "remind_time": remind_time_str, "event_details": event_details}.items() if not v]
error_msg = f"缺少必要的提醒参数: {', '.join(missing_params)}"
logger.warning(f"[ReminderPlugin] LLM未能提取完整参数: {error_msg}")
return False, error_msg
# 1. 解析时间
try:
assert isinstance(remind_time_str, str)
# 优先尝试直接解析
try:
target_time = parse_datetime(remind_time_str, fuzzy=True)
except Exception:
# 如果直接解析失败,调用 LLM 进行转换
logger.info(f"[ReminderPlugin] 直接解析时间 '{remind_time_str}' 失败,尝试使用 LLM 进行转换...")
# 获取所有可用的模型配置
available_models = llm_api.get_available_models()
if "planner" not in available_models:
raise ValueError("未找到 'planner' 决策模型配置,无法解析时间")
# 明确使用 'planner' 模型
model_to_use = available_models["planner"]
# 在执行时动态获取当前时间
current_time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
prompt = (
f"请将以下自然语言时间短语转换为一个未来的、标准的 'YYYY-MM-DD HH:MM:SS' 格式。"
f"请只输出转换后的时间字符串,不要包含任何其他说明或文字。\n"
f"作为参考,当前时间是: {current_time_str}\n"
f"需要转换的时间短语是: '{remind_time_str}'\n"
f"规则:\n"
f"- 如果用户没有明确指出是上午还是下午请根据当前时间判断。例如如果当前是上午用户说8点则应理解为今天的8点如果当前是下午用户说8点则应理解为今天的20点。\n"
f"- 如果转换后的时间早于当前时间,则应理解为第二天的时间。\n"
f"示例:\n"
f"- 当前时间: 2025-09-16 10:00:00, 用户说: '8点' -> '2025-09-17 08:00:00'\n"
f"- 当前时间: 2025-09-16 14:00:00, 用户说: '8点' -> '2025-09-16 20:00:00'\n"
f"- 当前时间: 2025-09-16 23:00:00, 用户说: '晚上10点' -> '2025-09-17 22:00:00'"
)
success, response, _, _ = await llm_api.generate_with_model(
prompt,
model_config=model_to_use,
request_type="plugin.reminder.time_parser"
)
if not success or not response:
raise ValueError(f"LLM未能返回有效的时间字符串: {response}")
converted_time_str = response.strip()
logger.info(f"[ReminderPlugin] LLM 转换结果: '{converted_time_str}'")
target_time = parse_datetime(converted_time_str, fuzzy=False)
except Exception as e:
logger.error(f"[ReminderPlugin] 无法解析或转换时间字符串 '{remind_time_str}': {e}", exc_info=True)
await self.send_text(f"抱歉,我无法理解您说的时间 '{remind_time_str}',提醒设置失败。")
return False, f"无法解析时间 '{remind_time_str}'"
now = datetime.now()
if target_time <= now:
await self.send_text("提醒时间必须是一个未来的时间点哦,提醒设置失败。")
return False, "提醒时间必须在未来"
delay_seconds = (target_time - now).total_seconds()
# 2. 解析用户
person_manager = get_person_info_manager()
user_id_to_remind = None
user_name_to_remind = ""
assert isinstance(user_name, str)
if user_name.strip() in ["自己", "", "me"]:
user_id_to_remind = self.user_id
user_name_to_remind = self.user_nickname
else:
# 1. 精确匹配
user_info = await person_manager.get_person_info_by_name(user_name)
# 2. 包含匹配
if not user_info:
for person_id, name in person_manager.person_name_list.items():
if user_name in name:
user_info = await person_manager.get_values(person_id, ["user_id", "user_nickname"])
break
# 3. 模糊匹配 (此处简化为字符串相似度)
if not user_info:
best_match = None
highest_similarity = 0
for person_id, name in person_manager.person_name_list.items():
import difflib
similarity = difflib.SequenceMatcher(None, user_name, name).ratio()
if similarity > highest_similarity:
highest_similarity = similarity
best_match = person_id
if best_match and highest_similarity > 0.6: # 相似度阈值
user_info = await person_manager.get_values(best_match, ["user_id", "user_nickname"])
if not user_info or not user_info.get("user_id"):
logger.warning(f"[ReminderPlugin] 找不到名为 '{user_name}' 的用户")
await self.send_text(f"抱歉,我的联系人里找不到叫做 '{user_name}' 的人,提醒设置失败。")
return False, f"用户 '{user_name}' 不存在"
user_id_to_remind = user_info.get("user_id")
user_name_to_remind = user_info.get("user_nickname") or user_name
# 3. 创建并调度异步任务
try:
assert user_id_to_remind is not None
assert event_details is not None
reminder_task = ReminderTask(
delay=delay_seconds,
stream_id=self.chat_stream.stream_id,
group_id=self.chat_stream.group_info.group_id if self.is_group and self.chat_stream.group_info else None,
is_group=self.is_group,
target_user_id=str(user_id_to_remind),
target_user_name=str(user_name_to_remind),
event_details=str(event_details),
creator_name=str(self.user_nickname),
chat_stream=self.chat_stream
)
await async_task_manager.add_task(reminder_task)
# 4. 生成并发送确认消息
extra_info = f"你已经成功设置了一个提醒,请以一种符合你人设的、俏皮的方式回复用户。\n提醒时间: {target_time.strftime('%Y-%m-%d %H:%M:%S')}\n提醒对象: {user_name_to_remind}\n提醒内容: {event_details}"
last_message = self.chat_stream.context.get_last_message()
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.chat_stream,
extra_info=extra_info,
reply_message=last_message.to_dict(),
request_type="plugin.reminder.confirm_message"
)
if success and reply_set:
for _, text in reply_set:
await self.send_text(text)
else:
# Fallback message
fallback_message = f"好的,我记下了。\n将在 {target_time.strftime('%Y-%m-%d %H:%M:%S')} 提醒 {user_name_to_remind}\n{event_details}"
await self.send_text(fallback_message)
return True, "提醒设置成功"
except Exception as e:
logger.error(f"[ReminderPlugin] 创建提醒任务时出错: {e}", exc_info=True)
await self.send_text("抱歉,设置提醒时发生了一点内部错误。")
return False, "设置提醒时发生内部错误"
# =============================== Plugin ===============================
@register_plugin
class ReminderPlugin(BasePlugin):
"""一个能从对话中智能识别并设置定时提醒的插件。"""
# --- 插件基础信息 ---
plugin_name = "reminder_plugin"
enable_plugin = True
dependencies = []
python_dependencies = []
config_file_name = "config.toml"
config_schema = {}
def get_plugin_components(self) -> List[Tuple[ActionInfo, Type[BaseAction]]]:
"""注册插件的所有功能组件。"""
return [
(RemindAction.get_action_info(), RemindAction)
]

View File

@@ -1,18 +0,0 @@
{
"manifest_version": 1,
"name": "Set Typing Status",
"description": "一个在LLM生成回复时设置私聊输入状态的插件。",
"version": "1.0.0",
"author": {
"name": "MoFox-Studio"
},
"license": "MIT",
"homepage_url": "",
"repository_url": "",
"keywords": ["typing", "status", "private chat"],
"categories": ["utility"],
"host_application": {
"min_version": "0.10.0"
}
}

View File

@@ -1,63 +0,0 @@
from typing import List, Tuple, Type
import logging
from src.plugin_system import (
BasePlugin,
register_plugin,
ComponentInfo,
BaseEventHandler,
EventType,
)
from src.plugin_system.base.base_event import HandlerResult
from src.plugin_system.apis import send_api
logger = logging.getLogger(__name__)
class SetTypingStatusHandler(BaseEventHandler):
"""在LLM处理私聊消息后设置“正在输入”状态的事件处理器。"""
handler_name = "set_typing_status_handler"
handler_description = "在LLM生成回复后将用户的聊天状态设置为“正在输入”。"
init_subscribe = [EventType.POST_LLM]
async def execute(self, params: dict) -> HandlerResult:
message = params.get("message")
if not message or not message.is_private_message:
return HandlerResult(success=True, continue_process=True)
user_id = message.message_info.user_info.user_id
if not user_id:
return HandlerResult(success=False, continue_process=True, message="无法获取用户ID")
try:
params = {"user_id": user_id, "event_type": 1}
await send_api.adapter_command_to_stream(
action="set_input_status",
params=params,
stream_id=message.stream_id,
)
logger.debug(f"成功为用户 {user_id} 设置“正在输入”状态。")
return HandlerResult(success=True, continue_process=True)
except Exception as e:
logger.error(f"为用户 {user_id} 设置“正在输入”状态时出错: {e}")
return HandlerResult(success=False, continue_process=True, message=str(e))
@register_plugin
class SetTypingStatusPlugin(BasePlugin):
"""一个在LLM生成回复时设置私聊输入状态的插件。"""
plugin_name = "set_typing_status"
enable_plugin = True
dependencies = []
python_dependencies = []
config_file_name = ""
config_schema = {}
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
"""注册插件的功能组件。"""
return [(SetTypingStatusHandler.get_handler_info(), SetTypingStatusHandler)]
def register_plugin(self) -> bool:
return True

View File

@@ -0,0 +1,25 @@
{
"manifest_version": 1,
"name": "MoFox-Bot工具箱",
"version": "1.0.0",
"description": "一个集合多种实用功能的插件,旨在提升聊天体验和效率。",
"author": {
"name": "MoFox-Studio",
"url": "https://github.com/MoFox-Studio"
},
"license": "GPL-v3.0-or-later",
"host_application": {
"min_version": "0.10.0"
},
"keywords": ["emoji", "reaction", "like", "表情", "回应", "点赞"],
"categories": ["Chat", "Integration"],
"default_locale": "zh-CN",
"locales_path": "_locales",
"plugin_info": {
"is_built_in": "true",
"plugin_type": "functional"
}
}

View File

@@ -0,0 +1,569 @@
import re
from typing import List, Tuple, Type, Optional
from src.plugin_system import (
BasePlugin,
register_plugin,
BaseAction,
ComponentInfo,
ActionActivationType,
ConfigField,
)
from src.common.logger import get_logger
from .qq_emoji_list import qq_face
from src.plugin_system.base.component_types import ChatType
from src.person_info.person_info import get_person_info_manager
from dateutil.parser import parse as parse_datetime
from src.manager.async_task_manager import AsyncTask, async_task_manager
from src.plugin_system.apis import send_api, llm_api, generator_api
from src.plugin_system.base.component_types import ComponentType
from typing import Optional
from src.chat.message_receive.chat_stream import ChatStream
import asyncio
import datetime
logger = get_logger("set_emoji_like_plugin")
# ============================ AsyncTask ============================
class ReminderTask(AsyncTask):
def __init__(
self,
delay: float,
stream_id: str,
group_id: Optional[str],
is_group: bool,
target_user_id: str,
target_user_name: str,
event_details: str,
creator_name: str,
chat_stream: ChatStream,
):
super().__init__(task_name=f"ReminderTask_{target_user_id}_{datetime.datetime.now().timestamp()}")
self.delay = delay
self.stream_id = stream_id
self.group_id = group_id
self.is_group = is_group
self.target_user_id = target_user_id
self.target_user_name = target_user_name
self.event_details = event_details
self.creator_name = creator_name
self.chat_stream = chat_stream
async def run(self):
try:
if self.delay > 0:
logger.info(f"等待 {self.delay:.2f} 秒后执行提醒...")
await asyncio.sleep(self.delay)
logger.info(f"执行提醒任务: 给 {self.target_user_name} 发送关于 '{self.event_details}' 的提醒")
extra_info = f"现在是提醒时间,请你以一种符合你人设的、俏皮的方式提醒 {self.target_user_name}\n提醒内容: {self.event_details}\n设置提醒的人: {self.creator_name}"
last_message = self.chat_stream.context_manager.context.get_last_message()
reply_message_dict = last_message.flatten() if last_message else None
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.chat_stream,
extra_info=extra_info,
reply_message=reply_message_dict,
request_type="plugin.reminder.remind_message",
)
if success and reply_set:
for i, (_, text) in enumerate(reply_set):
if self.is_group:
message_payload = []
if i == 0:
message_payload.append({"type": "at", "data": {"qq": self.target_user_id}})
message_payload.append({"type": "text", "data": {"text": f" {text}"}})
await send_api.adapter_command_to_stream(
action="send_group_msg",
params={"group_id": self.group_id, "message": message_payload},
stream_id=self.stream_id,
)
else:
await send_api.text_to_stream(text=text, stream_id=self.stream_id)
else:
# Fallback message
reminder_text = f"叮咚!这是 {self.creator_name} 让我准时提醒你的事情:\n\n{self.event_details}"
if self.is_group:
message_payload = [
{"type": "at", "data": {"qq": self.target_user_id}},
{"type": "text", "data": {"text": f" {reminder_text}"}},
]
await send_api.adapter_command_to_stream(
action="send_group_msg",
params={"group_id": self.group_id, "message": message_payload},
stream_id=self.stream_id,
)
else:
await send_api.text_to_stream(text=reminder_text, stream_id=self.stream_id)
logger.info(f"提醒任务 {self.task_name} 成功完成。")
except Exception as e:
logger.error(f"执行提醒任务 {self.task_name} 时出错: {e}", exc_info=True)
# =============================== Actions ===============================
def get_emoji_id(emoji_input: str) -> str | None:
"""根据输入获取表情ID"""
# 如果输入本身就是数字ID直接返回
if emoji_input.isdigit() or (isinstance(emoji_input, str) and emoji_input.startswith("😊")):
if emoji_input in qq_face:
return emoji_input
# 尝试从 "[表情xxx]" 格式中提取
match = re.search(r"\[表情:(.+?)\]", emoji_input)
if match:
emoji_name = match.group(1).strip()
else:
emoji_name = emoji_input.strip()
# 遍历查找
for key, value in qq_face.items():
# value 的格式是 "[表情xxx]"
if f"[表情:{emoji_name}]" == value:
return key
return None
# ===== Action组件 =====
class PokeAction(BaseAction):
"""发送戳一戳动作"""
# === 基本信息(必须填写)===
action_name = "poke_user"
action_description = "向用户发送戳一戳"
activation_type = ActionActivationType.ALWAYS
parallel_action = True
# === 功能描述(必须填写)===
action_parameters = {
"user_name": "需要戳一戳的用户的名字 (可选)",
"user_id": "需要戳一戳的用户的ID (可选,优先级更高)",
"times": "需要戳一戳的次数 (默认为 1)",
}
action_require = ["当需要戳某个用户时使用", "当你想提醒特定用户时使用"]
llm_judge_prompt = """
判定是否需要使用戳一戳动作的条件:
1. **关键**: 这是一个高消耗的动作,请仅在绝对必要时使用,例如用户明确要求或作为提醒的关键部分。请极其谨慎地使用。
2. **用户请求**: 用户明确要求使用戳一戳。
3. **互动提醒**: 你想以一种有趣的方式提醒或与某人互动,但请确保这是对话的自然延伸,而不是无故打扰。
4. **上下文需求**: 上下文明确需要你戳一个或多个人。
5. **频率限制**: 如果最近已经戳过,或者用户情绪不高,请绝对不要使用。
请回答""""
"""
associated_types = ["text"]
async def execute(self) -> Tuple[bool, str]:
"""执行戳一戳的动作"""
user_id = self.action_data.get("user_id")
user_name = self.action_data.get("user_name")
try:
times = int(self.action_data.get("times", 1))
except (ValueError, TypeError):
times = 1
# 优先使用 user_id
if not user_id:
if not user_name:
logger.warning("戳一戳动作缺少 'user_id''user_name' 参数。")
return False, "缺少用户标识参数"
# 备用方案:通过 user_name 查找
user_info = await get_person_info_manager().get_person_info_by_name(user_name)
if not user_info or not user_info.get("user_id"):
logger.info(f"找不到名为 '{user_name}' 的用户。")
return False, f"找不到名为 '{user_name}' 的用户"
user_id = user_info.get("user_id")
display_name = user_name or user_id
for i in range(times):
logger.info(f"正在向 {display_name} ({user_id}) 发送第 {i + 1}/{times} 次戳一戳...")
await self.send_command(
"SEND_POKE", args={"qq_id": user_id}, display_message=f"戳了戳 {display_name} ({i + 1}/{times})"
)
# 添加一个小的延迟,以避免发送过快
await asyncio.sleep(0.5)
success_message = f"已向 {display_name} 发送 {times} 次戳一戳。"
await self.store_action_info(
action_build_into_prompt=True, action_prompt_display=success_message, action_done=True
)
return True, success_message
class SetEmojiLikeAction(BaseAction):
"""设置消息表情回应"""
# === 基本信息(必须填写)===
action_name = "set_emoji_like"
action_description = "为某条已经存在的消息添加‘贴表情’回应(类似点赞),而不是发送新消息。可以在觉得某条消息非常有趣、值得赞同或者需要特殊情感回应时主动使用。"
activation_type = ActionActivationType.ALWAYS # 消息接收时激活(?)
chat_type_allow = ChatType.GROUP
parallel_action = True
# === 功能描述(必须填写)===
# 从 qq_face 字典中提取所有表情名称用于提示
emoji_options = []
for name in qq_face.values():
match = re.search(r"\[表情:(.+?)\]", name)
if match:
emoji_options.append(match.group(1))
action_parameters = {
"set": "是否设置回应 (True/False)",
}
action_require = [
"当需要对一个已存在消息进行‘贴表情’回应时使用",
"这是一个对旧消息的操作,而不是发送新消息",
"如果你想发送一个新的表情包消息,请使用 'emoji' 动作",
]
llm_judge_prompt = """
判定是否需要使用贴表情动作的条件:
1. 用户明确要求使用贴表情包
2. 这是一个适合表达强烈情绪的场合
3. 不要发送太多表情包,如果你已经发送过多个表情包则回答""
请回答""""
"""
associated_types = ["text"]
async def execute(self) -> Tuple[bool, str]:
"""执行设置表情回应的动作"""
message_id = None
set_like = self.action_data.get("set", True)
if self.has_action_message:
logger.debug(str(self.action_message))
if isinstance(self.action_message, dict):
message_id = self.action_message.get("message_id")
logger.info(f"获取到的消息ID: {message_id}")
else:
logger.error("未提供消息ID")
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了set_emoji_like动作{self.action_name},失败: 未提供消息ID",
action_done=False,
)
return False, "未提供消息ID"
available_models = llm_api.get_available_models()
if "utils_small" not in available_models:
logger.error("未找到 'utils_small' 模型配置,无法选择表情")
return False, "表情选择功能配置错误"
model_to_use = available_models["utils_small"]
# 获取最近的对话历史作为上下文
context_text = ""
if self.action_message:
context_text = self.action_message.get("processed_plain_text", "")
else:
logger.error("无法找到动作选择的原始消息")
return False, "无法找到动作选择的原始消息"
prompt = (
f"根据以下这条消息,从列表中选择一个最合适的表情名称来回应这条消息。\n"
f"消息内容: '{context_text}'\n"
f"可用表情列表: {', '.join(self.emoji_options)}\n"
f"你的任务是:只输出你选择的表情的名称,不要包含任何其他文字或标点。\n"
f"例如,如果觉得应该用'',就只输出''"
)
success, response, _, _ = await llm_api.generate_with_model(
prompt, model_config=model_to_use, request_type="plugin.set_emoji_like.select_emoji"
)
if not success or not response:
logger.error("二级LLM未能选择有效的表情。")
return False, "无法找到合适的表情。"
chosen_emoji_name = response.strip()
logger.info(f"二级LLM选择的表情是: '{chosen_emoji_name}'")
emoji_id = get_emoji_id(chosen_emoji_name)
if not emoji_id:
logger.error(f"二级LLM选择的表情 '{chosen_emoji_name}' 仍然无法匹配到有效的表情ID。")
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了set_emoji_like动作{self.action_name},失败: 找不到表情: '{chosen_emoji_name}'",
action_done=False,
)
return False, f"找不到表情: '{chosen_emoji_name}'"
# 4. 使用适配器API发送命令
if not message_id:
logger.error("未提供消息ID")
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了set_emoji_like动作{self.action_name},失败: 未提供消息ID",
action_done=False,
)
return False, "未提供消息ID"
try:
# 使用适配器API发送贴表情命令
success = await self.send_command(
command_name="set_emoji_like",
args={"message_id": message_id, "emoji_id": emoji_id, "set": set_like},
storage_message=False,
)
if success:
logger.info("设置表情回应成功")
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了set_emoji_like动作,{chosen_emoji_name},设置表情回应: {emoji_id}, 是否设置: {set_like}",
action_done=True,
)
return True, "成功设置表情回应"
else:
logger.error("设置表情回应失败")
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了set_emoji_like动作{self.action_name},失败",
action_done=False,
)
return False, "设置表情回应失败"
except Exception as e:
logger.error(f"设置表情回应失败: {e}")
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了set_emoji_like动作{self.action_name},失败: {e}",
action_done=False,
)
return False, f"设置表情回应失败: {e}"
class RemindAction(BaseAction):
"""一个能从对话中智能识别并设置定时提醒的动作。"""
# === 基本信息 ===
action_name = "set_reminder"
action_description = "根据用户的对话内容,智能地设置一个未来的提醒事项。"
activation_type = (ActionActivationType.KEYWORD,)
activation_keywords = ["提醒", "叫我", "记得", "别忘了"]
chat_type_allow = ChatType.ALL
parallel_action = True
# === LLM 判断与参数提取 ===
llm_judge_prompt = ""
action_parameters = {}
action_require = [
"当用户请求在未来的某个时间点提醒他/她或别人某件事时使用",
"适用于包含明确时间信息和事件描述的对话",
"例如:'10分钟后提醒我收快递''明天早上九点喊一下李四参加晨会'",
]
async def execute(self) -> Tuple[bool, str]:
"""执行设置提醒的动作"""
user_name = self.action_data.get("user_name")
remind_time_str = self.action_data.get("remind_time")
event_details = self.action_data.get("event_details")
if not all([user_name, remind_time_str, event_details]):
missing_params = [
p
for p, v in {
"user_name": user_name,
"remind_time": remind_time_str,
"event_details": event_details,
}.items()
if not v
]
error_msg = f"缺少必要的提醒参数: {', '.join(missing_params)}"
logger.warning(f"[ReminderPlugin] LLM未能提取完整参数: {error_msg}")
return False, error_msg
# 1. 解析时间
try:
assert isinstance(remind_time_str, str)
# 优先尝试直接解析
try:
target_time = parse_datetime(remind_time_str, fuzzy=True)
except Exception:
# 如果直接解析失败,调用 LLM 进行转换
logger.info(f"[ReminderPlugin] 直接解析时间 '{remind_time_str}' 失败,尝试使用 LLM 进行转换...")
# 获取所有可用的模型配置
available_models = llm_api.get_available_models()
if "utils_small" not in available_models:
raise ValueError("未找到 'utils_small' 模型配置,无法解析时间")
# 明确使用 'planner' 模型
model_to_use = available_models["utils_small"]
# 在执行时动态获取当前时间
current_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
prompt = (
f"请将以下自然语言时间短语转换为一个未来的、标准的 'YYYY-MM-DD HH:MM:SS' 格式。"
f"请只输出转换后的时间字符串,不要包含任何其他说明或文字。\n"
f"作为参考,当前时间是: {current_time_str}\n"
f"需要转换的时间短语是: '{remind_time_str}'\n"
f"规则:\n"
f"- 如果用户没有明确指出是上午还是下午请根据当前时间判断。例如如果当前是上午用户说8点则应理解为今天的8点如果当前是下午用户说8点则应理解为今天的20点。\n"
f"- 如果转换后的时间早于当前时间,则应理解为第二天的时间。\n"
f"示例:\n"
f"- 当前时间: 2025-09-16 10:00:00, 用户说: '8点' -> '2025-09-17 08:00:00'\n"
f"- 当前时间: 2025-09-16 14:00:00, 用户说: '8点' -> '2025-09-16 20:00:00'\n"
f"- 当前时间: 2025-09-16 23:00:00, 用户说: '晚上10点' -> '2025-09-17 22:00:00'"
)
success, response, _, _ = await llm_api.generate_with_model(
prompt, model_config=model_to_use, request_type="plugin.reminder.time_parser"
)
if not success or not response:
raise ValueError(f"LLM未能返回有效的时间字符串: {response}")
converted_time_str = response.strip()
logger.info(f"[ReminderPlugin] LLM 转换结果: '{converted_time_str}'")
target_time = parse_datetime(converted_time_str, fuzzy=False)
except Exception as e:
logger.error(f"[ReminderPlugin] 无法解析或转换时间字符串 '{remind_time_str}': {e}", exc_info=True)
await self.send_text(f"抱歉,我无法理解您说的时间 '{remind_time_str}',提醒设置失败。")
return False, f"无法解析时间 '{remind_time_str}'"
now = datetime.datetime.now()
if target_time <= now:
await self.send_text("提醒时间必须是一个未来的时间点哦,提醒设置失败。")
return False, "提醒时间必须在未来"
delay_seconds = (target_time - now).total_seconds()
# 2. 解析用户
person_manager = get_person_info_manager()
user_id_to_remind = None
user_name_to_remind = ""
assert isinstance(user_name, str)
if user_name.strip() in ["自己", "", "me"]:
user_id_to_remind = self.user_id
user_name_to_remind = self.user_nickname
else:
# 1. 精确匹配
user_info = await person_manager.get_person_info_by_name(user_name)
# 2. 包含匹配
if not user_info:
for person_id, name in person_manager.person_name_list.items():
if user_name in name:
user_info = await person_manager.get_values(person_id, ["user_id", "user_nickname"])
break
# 3. 模糊匹配 (此处简化为字符串相似度)
if not user_info:
best_match = None
highest_similarity = 0
for person_id, name in person_manager.person_name_list.items():
import difflib
similarity = difflib.SequenceMatcher(None, user_name, name).ratio()
if similarity > highest_similarity:
highest_similarity = similarity
best_match = person_id
if best_match and highest_similarity > 0.6: # 相似度阈值
user_info = await person_manager.get_values(best_match, ["user_id", "user_nickname"])
if not user_info or not user_info.get("user_id"):
logger.warning(f"[ReminderPlugin] 找不到名为 '{user_name}' 的用户")
await self.send_text(f"抱歉,我的联系人里找不到叫做 '{user_name}' 的人,提醒设置失败。")
return False, f"用户 '{user_name}' 不存在"
user_id_to_remind = user_info.get("user_id")
user_name_to_remind = user_info.get("user_nickname") or user_name
# 3. 创建并调度异步任务
try:
assert user_id_to_remind is not None
assert event_details is not None
reminder_task = ReminderTask(
delay=delay_seconds,
stream_id=self.chat_stream.stream_id,
group_id=self.chat_stream.group_info.group_id
if self.is_group and self.chat_stream.group_info
else None,
is_group=self.is_group,
target_user_id=str(user_id_to_remind),
target_user_name=str(user_name_to_remind),
event_details=str(event_details),
creator_name=str(self.user_nickname),
chat_stream=self.chat_stream,
)
await async_task_manager.add_task(reminder_task)
# 4. 生成并发送确认消息
extra_info = f"你已经成功设置了一个提醒,请以一种符合你人设的、俏皮的方式回复用户。\n提醒时间: {target_time.strftime('%Y-%m-%d %H:%M:%S')}\n提醒对象: {user_name_to_remind}\n提醒内容: {event_details}"
last_message = self.chat_stream.context_manager.context.get_last_message()
reply_message_dict = last_message.flatten() if last_message else None
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.chat_stream,
extra_info=extra_info,
reply_message=reply_message_dict,
request_type="plugin.reminder.confirm_message",
)
if success and reply_set:
for _, text in reply_set:
await self.send_text(text)
else:
# Fallback message
fallback_message = f"好的,我记下了。\n将在 {target_time.strftime('%Y-%m-%d %H:%M:%S')} 提醒 {user_name_to_remind}\n{event_details}"
await self.send_text(fallback_message)
return True, "提醒设置成功"
except Exception as e:
logger.error(f"[ReminderPlugin] 创建提醒任务时出错: {e}", exc_info=True)
await self.send_text("抱歉,设置提醒时发生了一点内部错误。")
return False, "设置提醒时发生内部错误"
# ===== 插件注册 =====
@register_plugin
class SetEmojiLikePlugin(BasePlugin):
"""一个集合多种实用功能的插件,旨在提升聊天体验和效率。"""
# 插件基本信息
plugin_name: str = "social_toolkit_plugin" # 内部标识符
enable_plugin: bool = True
dependencies: List[str] = [] # 插件依赖列表
python_dependencies: List[str] = [] # Python包依赖列表现在使用内置API
config_file_name: str = "config.toml" # 配置文件名
# 配置节描述
config_section_descriptions = {"plugin": "插件基本信息", "components": "插件组件"}
# 配置Schema定义
config_schema: dict = {
"plugin": {
"name": ConfigField(type=str, default="set_emoji_like", description="插件名称"),
"version": ConfigField(type=str, default="1.0.0", description="插件版本"),
"enabled": ConfigField(type=bool, default=True, description="是否启用插件"),
"config_version": ConfigField(type=str, default="1.1", description="配置版本"),
},
"components": {
"action_set_emoji_like": ConfigField(type=bool, default=True, description="是否启用设置表情回应功能"),
"action_poke_enable": ConfigField(type=bool, default=True, description="是否启用戳一戳功能"),
"action_set_reminder_enable": ConfigField(type=bool, default=True, description="是否启用定时提醒功能"),
},
}
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
enable_components = []
if self.get_config("components.action_set_emoji_like"):
enable_components.append((SetEmojiLikeAction.get_action_info(), SetEmojiLikeAction))
if self.get_config("components.action_poke_enable"):
enable_components.append((PokeAction.get_action_info(), PokeAction))
if self.get_config("components.action_set_reminder_enable"):
enable_components.append((RemindAction.get_action_info(), RemindAction))
return enable_components

View File

@@ -1,216 +0,0 @@
import asyncio
from datetime import datetime
from typing import List, Tuple, Type
from dateutil.parser import parse as parse_datetime
from src.common.logger import get_logger
from src.manager.async_task_manager import AsyncTask, async_task_manager
from src.person_info.person_info import get_person_info_manager
from src.plugin_system import (
BaseAction,
ActionInfo,
BasePlugin,
register_plugin,
ActionActivationType,
)
from src.plugin_system.apis import send_api
from src.plugin_system.base.component_types import ChatType
logger = get_logger(__name__)
# ============================ AsyncTask ============================
class ReminderTask(AsyncTask):
def __init__(
self,
delay: float,
stream_id: str,
is_group: bool,
target_user_id: str,
target_user_name: str,
event_details: str,
creator_name: str,
):
super().__init__(task_name=f"ReminderTask_{target_user_id}_{datetime.now().timestamp()}")
self.delay = delay
self.stream_id = stream_id
self.is_group = is_group
self.target_user_id = target_user_id
self.target_user_name = target_user_name
self.event_details = event_details
self.creator_name = creator_name
async def run(self):
try:
if self.delay > 0:
logger.info(f"等待 {self.delay:.2f} 秒后执行提醒...")
await asyncio.sleep(self.delay)
logger.info(f"执行提醒任务: 给 {self.target_user_name} 发送关于 '{self.event_details}' 的提醒")
reminder_text = f"叮咚!这是 {self.creator_name} 让我准时提醒你的事情:\n\n{self.event_details}"
if self.is_group:
# 在群聊中,构造 @ 消息段并发送
group_id = self.stream_id.split("_")[-1] if "_" in self.stream_id else self.stream_id
message_payload = [
{"type": "at", "data": {"qq": self.target_user_id}},
{"type": "text", "data": {"text": f" {reminder_text}"}},
]
await send_api.adapter_command_to_stream(
action="send_group_msg",
params={"group_id": group_id, "message": message_payload},
stream_id=self.stream_id,
)
else:
# 在私聊中,直接发送文本
await send_api.text_to_stream(text=reminder_text, stream_id=self.stream_id)
logger.info(f"提醒任务 {self.task_name} 成功完成。")
except Exception as e:
logger.error(f"执行提醒任务 {self.task_name} 时出错: {e}", exc_info=True)
# =============================== Actions ===============================
class RemindAction(BaseAction):
"""一个能从对话中智能识别并设置定时提醒的动作。"""
# === 基本信息 ===
action_name = "set_reminder"
action_description = "根据用户的对话内容,智能地设置一个未来的提醒事项。"
activation_type = ActionActivationType.LLM_JUDGE
chat_type_allow = ChatType.ALL
# === LLM 判断与参数提取 ===
llm_judge_prompt = """
判断用户是否意图设置一个未来的提醒。
- 必须包含明确的时间点或时间段如“十分钟后”、“明天下午3点”、“周五”
- 必须包含一个需要被提醒的事件。
- 可能会包含需要提醒的特定人物。
- 如果只是普通的聊天或询问时间,则不应触发。
示例:
- "半小时后提醒我开会" -> 是
- "明天下午三点叫张三来一下" -> 是
- "别忘了周五把报告交了" -> 是
- "现在几点了?" -> 否
- "我明天下午有空" -> 否
请只回答""""
"""
action_parameters = {
"user_name": "需要被提醒的人的称呼或名字,如果没有明确指定给某人,则默认为'自己'",
"remind_time": "描述提醒时间的自然语言字符串,例如'十分钟后''明天下午3点'",
"event_details": "需要提醒的具体事件内容",
}
action_require = [
"当用户请求在未来的某个时间点提醒他/她或别人某件事时使用",
"适用于包含明确时间信息和事件描述的对话",
"例如:'10分钟后提醒我收快递''明天早上九点喊一下李四参加晨会'",
]
async def execute(self) -> Tuple[bool, str]:
"""执行设置提醒的动作"""
user_name = self.action_data.get("user_name")
remind_time_str = self.action_data.get("remind_time")
event_details = self.action_data.get("event_details")
if not all([user_name, remind_time_str, event_details]):
missing_params = [
p
for p, v in {
"user_name": user_name,
"remind_time": remind_time_str,
"event_details": event_details,
}.items()
if not v
]
error_msg = f"缺少必要的提醒参数: {', '.join(missing_params)}"
logger.warning(f"[ReminderPlugin] LLM未能提取完整参数: {error_msg}")
return False, error_msg
# 1. 解析时间
try:
assert isinstance(remind_time_str, str)
target_time = parse_datetime(remind_time_str, fuzzy=True)
except Exception as e:
logger.error(f"[ReminderPlugin] 无法解析时间字符串 '{remind_time_str}': {e}")
await self.send_text(f"抱歉,我无法理解您说的时间 '{remind_time_str}',提醒设置失败。")
return False, f"无法解析时间 '{remind_time_str}'"
now = datetime.now()
if target_time <= now:
await self.send_text("提醒时间必须是一个未来的时间点哦,提醒设置失败。")
return False, "提醒时间必须在未来"
delay_seconds = (target_time - now).total_seconds()
# 2. 解析用户
person_manager = get_person_info_manager()
user_id_to_remind = None
user_name_to_remind = ""
assert isinstance(user_name, str)
if user_name.strip() in ["自己", "", "me"]:
user_id_to_remind = self.user_id
user_name_to_remind = self.user_nickname
else:
user_info = await person_manager.get_person_info_by_name(user_name)
if not user_info or not user_info.get("user_id"):
logger.warning(f"[ReminderPlugin] 找不到名为 '{user_name}' 的用户")
await self.send_text(f"抱歉,我的联系人里找不到叫做 '{user_name}' 的人,提醒设置失败。")
return False, f"用户 '{user_name}' 不存在"
user_id_to_remind = user_info.get("user_id")
user_name_to_remind = user_name
# 3. 创建并调度异步任务
try:
assert user_id_to_remind is not None
assert event_details is not None
reminder_task = ReminderTask(
delay=delay_seconds,
stream_id=self.chat_id,
is_group=self.is_group,
target_user_id=str(user_id_to_remind),
target_user_name=str(user_name_to_remind),
event_details=str(event_details),
creator_name=str(self.user_nickname),
)
await async_task_manager.add_task(reminder_task)
# 4. 发送确认消息
confirm_message = f"好的,我记下了。\n将在 {target_time.strftime('%Y-%m-%d %H:%M:%S')} 提醒 {user_name_to_remind}\n{event_details}"
await self.send_text(confirm_message)
return True, "提醒设置成功"
except Exception as e:
logger.error(f"[ReminderPlugin] 创建提醒任务时出错: {e}", exc_info=True)
await self.send_text("抱歉,设置提醒时发生了一点内部错误。")
return False, "设置提醒时发生内部错误"
# =============================== Plugin ===============================
@register_plugin
class ReminderPlugin(BasePlugin):
"""一个能从对话中智能识别并设置定时提醒的插件。"""
# --- 插件基础信息 ---
plugin_name = "reminder_plugin"
enable_plugin = True
dependencies = []
python_dependencies = []
config_file_name = "config.toml"
config_schema = {}
def get_plugin_components(self) -> List[Tuple[ActionInfo, Type[BaseAction]]]:
"""注册插件的所有功能组件。"""
return [(RemindAction.get_action_info(), RemindAction)]

View File

@@ -28,20 +28,20 @@ class PlanManager:
if target_month is None: if target_month is None:
target_month = datetime.now().strftime("%Y-%m") target_month = datetime.now().strftime("%Y-%m")
if not has_active_plans(target_month): if not await has_active_plans(target_month):
logger.info(f" {target_month} 没有任何有效的月度计划,将触发同步生成。") logger.info(f" {target_month} 没有任何有效的月度计划,将触发同步生成。")
generation_successful = await self._generate_monthly_plans_logic(target_month) generation_successful = await self._generate_monthly_plans_logic(target_month)
return generation_successful return generation_successful
else: else:
logger.info(f"{target_month} 已存在有效的月度计划。") logger.info(f"{target_month} 已存在有效的月度计划。")
plans = get_active_plans_for_month(target_month) plans = await get_active_plans_for_month(target_month)
max_plans = global_config.planning_system.max_plans_per_month max_plans = global_config.planning_system.max_plans_per_month
if len(plans) > max_plans: if len(plans) > max_plans:
logger.warning(f"当前月度计划数量 ({len(plans)}) 超出上限 ({max_plans}),将自动删除多余的计划。") logger.warning(f"当前月度计划数量 ({len(plans)}) 超出上限 ({max_plans}),将自动删除多余的计划。")
plans_to_delete = plans[: len(plans) - max_plans] plans_to_delete = plans[: len(plans) - max_plans]
delete_ids = [p.id for p in plans_to_delete] delete_ids = [p.id for p in plans_to_delete]
delete_plans_by_ids(delete_ids) # type: ignore await delete_plans_by_ids(delete_ids) # type: ignore
plans = get_active_plans_for_month(target_month) plans = await get_active_plans_for_month(target_month)
if plans: if plans:
plan_texts = "\n".join([f" {i + 1}. {plan.plan_text}" for i, plan in enumerate(plans)]) plan_texts = "\n".join([f" {i + 1}. {plan.plan_text}" for i, plan in enumerate(plans)])
@@ -64,11 +64,11 @@ class PlanManager:
return False return False
last_month = self._get_previous_month(target_month) last_month = self._get_previous_month(target_month)
archived_plans = get_archived_plans_for_month(last_month) archived_plans = await get_archived_plans_for_month(last_month)
plans = await self.llm_generator.generate_plans_with_llm(target_month, archived_plans) plans = await self.llm_generator.generate_plans_with_llm(target_month, archived_plans)
if plans: if plans:
add_new_plans(plans, target_month) await add_new_plans(plans, target_month)
logger.info(f"成功为 {target_month} 生成并保存了 {len(plans)} 条月度计划。") logger.info(f"成功为 {target_month} 生成并保存了 {len(plans)} 条月度计划。")
return True return True
else: else:
@@ -95,11 +95,11 @@ class PlanManager:
if target_month is None: if target_month is None:
target_month = datetime.now().strftime("%Y-%m") target_month = datetime.now().strftime("%Y-%m")
logger.info(f" 开始归档 {target_month} 的活跃月度计划...") logger.info(f" 开始归档 {target_month} 的活跃月度计划...")
archived_count = archive_active_plans_for_month(target_month) archived_count = await archive_active_plans_for_month(target_month)
logger.info(f" 成功归档了 {archived_count}{target_month} 的月度计划。") logger.info(f" 成功归档了 {archived_count}{target_month} 的月度计划。")
except Exception as e: except Exception as e:
logger.error(f" 归档 {target_month} 月度计划时发生错误: {e}") logger.error(f" 归档 {target_month} 月度计划时发生错误: {e}")
def get_plans_for_schedule(self, month: str, max_count: int) -> List: async def get_plans_for_schedule(self, month: str, max_count: int) -> List:
avoid_days = global_config.planning_system.avoid_repetition_days avoid_days = global_config.planning_system.avoid_repetition_days
return get_smart_plans_for_daily_schedule(month, max_count=max_count, avoid_days=avoid_days) return await get_smart_plans_for_daily_schedule(month, max_count=max_count, avoid_days=avoid_days)

View File

@@ -53,8 +53,8 @@ price_out = 8.0 # 输出价格用于API调用统计
#use_anti_truncation = true # [可选] 启用反截断功能。当模型输出不完整时系统会自动重试。建议只为有需要的模型如Gemini开启。 #use_anti_truncation = true # [可选] 启用反截断功能。当模型输出不完整时系统会自动重试。建议只为有需要的模型如Gemini开启。
[[models]] [[models]]
model_identifier = "deepseek-ai/DeepSeek-V3" model_identifier = "deepseek-ai/DeepSeek-V3.1-Terminus"
name = "siliconflow-deepseek-v3" name = "siliconflow-deepseek-ai/DeepSeek-V3.1-Terminus"
api_provider = "SiliconFlow" api_provider = "SiliconFlow"
price_in = 2.0 price_in = 2.0
price_out = 8.0 price_out = 8.0
@@ -122,7 +122,7 @@ price_in = 4.0
price_out = 16.0 price_out = 16.0
[model_task_config.utils] # 在麦麦的一些组件中使用的模型,例如表情包模块,取名模块,关系模块,是麦麦必须的模型 [model_task_config.utils] # 在麦麦的一些组件中使用的模型,例如表情包模块,取名模块,关系模块,是麦麦必须的模型
model_list = ["siliconflow-deepseek-v3"] # 使用的模型列表,每个子项对应上面的模型名称(name) model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.1-Terminus"] # 使用的模型列表,每个子项对应上面的模型名称(name)
temperature = 0.2 # 模型温度新V3建议0.1-0.3 temperature = 0.2 # 模型温度新V3建议0.1-0.3
max_tokens = 800 # 最大输出token数 max_tokens = 800 # 最大输出token数
#concurrency_count = 2 # 并发请求数量默认为1不并发设置为2或更高启用并发 #concurrency_count = 2 # 并发请求数量默认为1不并发设置为2或更高启用并发
@@ -133,28 +133,28 @@ temperature = 0.7
max_tokens = 800 max_tokens = 800
[model_task_config.replyer] # 首要回复模型,还用于表达器和表达方式学习 [model_task_config.replyer] # 首要回复模型,还用于表达器和表达方式学习
model_list = ["siliconflow-deepseek-v3"] model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.1-Terminus"]
temperature = 0.2 # 模型温度新V3建议0.1-0.3 temperature = 0.2 # 模型温度新V3建议0.1-0.3
max_tokens = 800 max_tokens = 800
[model_task_config.planner] #决策:负责决定麦麦该做什么的模型 [model_task_config.planner] #决策:负责决定麦麦该做什么的模型
model_list = ["siliconflow-deepseek-v3"] model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.1-Terminus"]
temperature = 0.3 temperature = 0.3
max_tokens = 800 max_tokens = 800
[model_task_config.emotion] #负责麦麦的情绪变化 [model_task_config.emotion] #负责麦麦的情绪变化
model_list = ["siliconflow-deepseek-v3"] model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.1-Terminus"]
temperature = 0.3 temperature = 0.3
max_tokens = 800 max_tokens = 800
[model_task_config.mood] #负责麦麦的心情变化 [model_task_config.mood] #负责麦麦的心情变化
model_list = ["siliconflow-deepseek-v3"] model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.1-Terminus"]
temperature = 0.3 temperature = 0.3
max_tokens = 800 max_tokens = 800
[model_task_config.maizone] # maizone模型 [model_task_config.maizone] # maizone模型
model_list = ["siliconflow-deepseek-v3"] model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.1-Terminus"]
temperature = 0.7 temperature = 0.7
max_tokens = 800 max_tokens = 800
@@ -181,7 +181,7 @@ temperature = 0.7
max_tokens = 800 max_tokens = 800
[model_task_config.schedule_generator]#日程表生成模型 [model_task_config.schedule_generator]#日程表生成模型
model_list = ["siliconflow-deepseek-v3"] model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.1-Terminus"]
temperature = 0.7 temperature = 0.7
max_tokens = 1000 max_tokens = 1000
@@ -191,12 +191,12 @@ temperature = 0.1 # 低温度确保检测结果稳定
max_tokens = 200 # 检测结果不需要太长的输出 max_tokens = 200 # 检测结果不需要太长的输出
[model_task_config.monthly_plan_generator] # 月层计划生成模型 [model_task_config.monthly_plan_generator] # 月层计划生成模型
model_list = ["siliconflow-deepseek-v3"] model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.1-Terminus"]
temperature = 0.7 temperature = 0.7
max_tokens = 1000 max_tokens = 1000
[model_task_config.relationship_tracker] # 用户关系追踪模型 [model_task_config.relationship_tracker] # 用户关系追踪模型
model_list = ["siliconflow-deepseek-v3"] model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.1-Terminus"]
temperature = 0.7 temperature = 0.7
max_tokens = 1000 max_tokens = 1000
@@ -210,12 +210,12 @@ embedding_dimension = 1024
#------------LPMM知识库模型------------ #------------LPMM知识库模型------------
[model_task_config.lpmm_entity_extract] # 实体提取模型 [model_task_config.lpmm_entity_extract] # 实体提取模型
model_list = ["siliconflow-deepseek-v3"] model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.1-Terminus"]
temperature = 0.2 temperature = 0.2
max_tokens = 800 max_tokens = 800
[model_task_config.lpmm_rdf_build] # RDF构建模型 [model_task_config.lpmm_rdf_build] # RDF构建模型
model_list = ["siliconflow-deepseek-v3"] model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.1-Terminus"]
temperature = 0.2 temperature = 0.2
max_tokens = 800 max_tokens = 800