This commit is contained in:
春河晴
2025-06-10 16:13:31 +09:00
parent 440e8bf7f3
commit 8d9a88a903
70 changed files with 1598 additions and 1642 deletions

View File

@@ -40,7 +40,7 @@ DEFAULT_CONFIG = {
"default_guidance_scale": 2.5,
"default_seed": 42,
"cache_enabled": True,
"cache_max_size": 10
"cache_max_size": 10,
}
@@ -49,37 +49,37 @@ def validate_and_fix_config(config_path: str) -> bool:
try:
with open(config_path, "r", encoding="utf-8") as f:
config = toml.load(f)
# 检查缺失的配置项
missing_keys = []
fixed = False
for key, default_value in DEFAULT_CONFIG.items():
if key not in config:
missing_keys.append(key)
config[key] = default_value
fixed = True
logger.info(f"添加缺失的配置项: {key} = {default_value}")
# 验证配置值的类型和范围
if isinstance(config.get("default_guidance_scale"), (int, float)):
if not 0.1 <= config["default_guidance_scale"] <= 20.0:
config["default_guidance_scale"] = 2.5
fixed = True
logger.info("修复无效的 default_guidance_scale 值")
if isinstance(config.get("default_seed"), (int, float)):
config["default_seed"] = int(config["default_seed"])
else:
config["default_seed"] = 42
fixed = True
logger.info("修复无效的 default_seed 值")
if config.get("cache_max_size") and not isinstance(config["cache_max_size"], int):
config["cache_max_size"] = 10
fixed = True
logger.info("修复无效的 cache_max_size 值")
# 如果有修复,写回文件
if fixed:
# 创建备份
@@ -87,14 +87,14 @@ def validate_and_fix_config(config_path: str) -> bool:
if os.path.exists(config_path):
os.rename(config_path, backup_path)
logger.info(f"已创建配置备份: {backup_path}")
# 写入修复后的配置
with open(config_path, "w", encoding="utf-8") as f:
toml.dump(config, f)
logger.info(f"配置文件已修复: {config_path}")
return True
except Exception as e:
logger.error(f"验证配置文件时出错: {e}")
return False

View File

@@ -37,15 +37,15 @@ class PicAction(PluginAction):
]
enable_plugin = False
action_config_file_name = "pic_action_config.toml"
# 激活类型设置
focus_activation_type = ActionActivationType.LLM_JUDGE # Focus模式使用LLM判定精确理解需求
normal_activation_type = ActionActivationType.KEYWORD # Normal模式使用关键词激活快速响应
normal_activation_type = ActionActivationType.KEYWORD # Normal模式使用关键词激活快速响应
# 关键词设置用于Normal模式
activation_keywords = ["", "绘制", "生成图片", "画图", "draw", "paint", "图片生成"]
keyword_case_sensitive = False
# LLM判定提示词用于Focus模式
llm_judge_prompt = """
判定是否需要使用图片生成动作的条件:
@@ -67,31 +67,31 @@ class PicAction(PluginAction):
4. 技术讨论中提到绘图概念但无生成需求
5. 用户明确表示不需要图片时
"""
# Random激活概率备用
random_activation_probability = 0.15 # 适中概率,图片生成比较有趣
# 简单的请求缓存,避免短时间内重复请求
_request_cache = {}
_cache_max_size = 10
# 模式启用设置 - 图片生成在所有模式下可用
mode_enable = ChatMode.ALL
# 并行执行设置 - 图片生成可以与回复并行执行,不覆盖回复内容
parallel_action = False
@classmethod
def _get_cache_key(cls, description: str, model: str, size: str) -> str:
"""生成缓存键"""
return f"{description[:100]}|{model}|{size}" # 限制描述长度避免键过长
@classmethod
def _cleanup_cache(cls):
"""清理缓存,保持大小在限制内"""
if len(cls._request_cache) > cls._cache_max_size:
# 简单的FIFO策略移除最旧的条目
keys_to_remove = list(cls._request_cache.keys())[:-cls._cache_max_size//2]
keys_to_remove = list(cls._request_cache.keys())[: -cls._cache_max_size // 2]
for key in keys_to_remove:
del cls._request_cache[key]
@@ -169,7 +169,7 @@ class PicAction(PluginAction):
cached_result = self._request_cache[cache_key]
logger.info(f"{self.log_prefix} 使用缓存的图片结果")
await self.send_message_by_expressor("我之前画过类似的图片,用之前的结果~")
# 直接发送缓存的结果
send_success = await self.send_message(type="image", data=cached_result)
if send_success:
@@ -258,7 +258,7 @@ class PicAction(PluginAction):
# 缓存成功的结果
self._request_cache[cache_key] = base64_image_string
self._cleanup_cache()
await self.send_message_by_expressor("图片表情已发送!")
return True, "图片表情已发送"
else:
@@ -370,7 +370,7 @@ class PicAction(PluginAction):
def _validate_image_size(self, image_size: str) -> bool:
"""验证图片尺寸格式"""
try:
width, height = map(int, image_size.split('x'))
width, height = map(int, image_size.split("x"))
return 100 <= width <= 10000 and 100 <= height <= 10000
except (ValueError, TypeError):
return False

View File

@@ -11,4 +11,4 @@
- 用户输入特定格式的命令时触发
- 通过命令前缀(如/)快速执行特定功能
- 提供快速响应的交互方式
"""
"""

View File

@@ -1,4 +1,4 @@
"""示例命令包
包含示例命令的实现
"""
"""

View File

@@ -5,27 +5,28 @@ import random
logger = get_logger("custom_prefix_command")
@register_command
class DiceCommand(BaseCommand):
"""骰子命令,使用!前缀而不是/前缀"""
command_name = "dice"
command_description = "骰子命令随机生成1-6的数字"
command_pattern = r"^[!](?:dice|骰子)(?:\s+(?P<count>\d+))?$" # 匹配 !dice 或 !骰子,可选参数为骰子数量
command_help = "使用方法: !dice [数量] 或 !骰子 [数量] - 掷骰子默认掷1个"
command_examples = ["!dice", "!骰子", "!dice 3", "!骰子 5"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行骰子命令
Returns:
Tuple[bool, Optional[str]]: (是否执行成功, 回复消息)
"""
try:
# 获取骰子数量默认为1
count_str = self.matched_groups.get("count")
# 确保count_str不为None
if count_str is None:
count = 1 # 默认值
@@ -38,10 +39,10 @@ class DiceCommand(BaseCommand):
return False, "一次最多只能掷10个骰子"
except ValueError:
return False, "骰子数量必须是整数"
# 生成随机数
results = [random.randint(1, 6) for _ in range(count)]
# 构建回复消息
if count == 1:
message = f"🎲 掷出了 {results[0]}"
@@ -49,10 +50,10 @@ class DiceCommand(BaseCommand):
dice_results = ", ".join(map(str, results))
total = sum(results)
message = f"🎲 掷出了 {count} 个骰子: [{dice_results}],总点数: {total}"
logger.info(f"{self.log_prefix} 执行骰子命令: {message}")
return True, message
except Exception as e:
logger.error(f"{self.log_prefix} 执行骰子命令时出错: {e}")
return False, f"执行命令时出错: {str(e)}"
return False, f"执行命令时出错: {str(e)}"

View File

@@ -4,90 +4,86 @@ from typing import Tuple, Optional
logger = get_logger("help_command")
@register_command
class HelpCommand(BaseCommand):
"""帮助命令,显示所有可用命令的帮助信息"""
command_name = "help"
command_description = "显示所有可用命令的帮助信息"
command_pattern = r"^/help(?:\s+(?P<command>\w+))?$" # 匹配 /help 或 /help 命令名
command_help = "使用方法: /help [命令名] - 显示所有命令或特定命令的帮助信息"
command_examples = ["/help", "/help echo"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行帮助命令
Returns:
Tuple[bool, Optional[str]]: (是否执行成功, 回复消息)
"""
try:
# 获取匹配到的命令名(如果有)
command_name = self.matched_groups.get("command")
# 如果指定了命令名,显示该命令的详细帮助
if command_name:
logger.info(f"{self.log_prefix} 查询命令帮助: {command_name}")
return self._show_command_help(command_name)
# 否则,显示所有命令的简要帮助
logger.info(f"{self.log_prefix} 查询所有命令帮助")
return self._show_all_commands()
except Exception as e:
logger.error(f"{self.log_prefix} 执行帮助命令时出错: {e}")
return False, f"执行命令时出错: {str(e)}"
def _show_command_help(self, command_name: str) -> Tuple[bool, str]:
"""显示特定命令的详细帮助信息
Args:
command_name: 命令名称
Returns:
Tuple[bool, str]: (是否执行成功, 回复消息)
"""
# 查找命令
command_cls = _COMMAND_REGISTRY.get(command_name)
if not command_cls:
return False, f"未找到命令: {command_name}"
# 获取命令信息
description = getattr(command_cls, "command_description", "无描述")
help_text = getattr(command_cls, "command_help", "无帮助信息")
examples = getattr(command_cls, "command_examples", [])
# 构建帮助信息
help_info = [
f"【命令】: {command_name}",
f"【描述】: {description}",
f"【用法】: {help_text}"
]
help_info = [f"【命令】: {command_name}", f"【描述】: {description}", f"【用法】: {help_text}"]
# 添加示例
if examples:
help_info.append("【示例】:")
for example in examples:
help_info.append(f" {example}")
return True, "\n".join(help_info)
def _show_all_commands(self) -> Tuple[bool, str]:
"""显示所有可用命令的简要帮助信息
Returns:
Tuple[bool, str]: (是否执行成功, 回复消息)
"""
# 获取所有已启用的命令
enabled_commands = {
name: cls for name, cls in _COMMAND_REGISTRY.items()
if getattr(cls, "enable_command", True)
name: cls for name, cls in _COMMAND_REGISTRY.items() if getattr(cls, "enable_command", True)
}
if not enabled_commands:
return True, "当前没有可用的命令"
# 构建命令列表
command_list = ["可用命令列表:"]
for name, cls in sorted(enabled_commands.items()):
@@ -107,9 +103,9 @@ class HelpCommand(BaseCommand):
else:
# 默认使用/name作为前缀
prefix = f"/{name}"
command_list.append(f"{prefix} - {description}")
command_list.append("\n使用 /help <命令名> 获取特定命令的详细帮助")
return True, "\n".join(command_list)
return True, "\n".join(command_list)

View File

@@ -1,43 +1,43 @@
from src.common.logger_manager import get_logger
from src.chat.command.command_handler import BaseCommand, register_command
from typing import Tuple, Optional
import json
logger = get_logger("message_info_command")
@register_command
class MessageInfoCommand(BaseCommand):
"""消息信息查看命令,展示发送命令的原始消息和相关信息"""
command_name = "msginfo"
command_description = "查看发送命令的原始消息信息"
command_pattern = r"^/msginfo(?:\s+(?P<detail>full|simple))?$"
command_help = "使用方法: /msginfo [full|simple] - 查看当前消息的详细信息"
command_examples = ["/msginfo", "/msginfo full", "/msginfo simple"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行消息信息查看命令"""
try:
detail_level = self.matched_groups.get("detail", "simple")
logger.info(f"{self.log_prefix} 查看消息信息,详细级别: {detail_level}")
if detail_level == "full":
info_text = self._get_full_message_info()
else:
info_text = self._get_simple_message_info()
return True, info_text
except Exception as e:
logger.error(f"{self.log_prefix} 获取消息信息时出错: {e}")
return False, f"获取消息信息失败: {str(e)}"
def _get_simple_message_info(self) -> str:
"""获取简化的消息信息"""
message = self.message
# 基础信息
info_lines = [
"📨 消息信息概览",
@@ -45,157 +45,181 @@ class MessageInfoCommand(BaseCommand):
f"⏰ 时间: {message.message_info.time}",
f"🌐 平台: {message.message_info.platform}",
]
# 发送者信息
user = message.message_info.user_info
info_lines.extend([
"",
"👤 发送者信息:",
f" 用户ID: {user.user_id}",
f" 昵称: {user.user_nickname}",
f" 群名片: {user.user_cardname or ''}",
])
info_lines.extend(
[
"",
"👤 发送者信息:",
f" 用户ID: {user.user_id}",
f" 昵称: {user.user_nickname}",
f" 群名片: {user.user_cardname or ''}",
]
)
# 群聊信息(如果是群聊)
if message.message_info.group_info:
group = message.message_info.group_info
info_lines.extend([
"",
"👥 群聊信息:",
f" ID: {group.group_id}",
f": {group.group_name or '未知'}",
])
info_lines.extend(
[
"",
"👥聊信息:",
f"ID: {group.group_id}",
f" 群名: {group.group_name or '未知'}",
]
)
else:
info_lines.extend([
"",
"💬 消息类型: 私聊消息",
])
info_lines.extend(
[
"",
"💬 消息类型: 私聊消息",
]
)
# 消息内容
info_lines.extend([
"",
"📝 消息内容:",
f" 原始文本: {message.processed_plain_text}",
f" 是否表情: {'' if getattr(message, 'is_emoji', False) else ''}",
])
# 聊天流信息
if hasattr(message, 'chat_stream') and message.chat_stream:
chat_stream = message.chat_stream
info_lines.extend([
info_lines.extend(
[
"",
"🔄 聊天流信息:",
f" 流ID: {chat_stream.stream_id}",
f" 是否激活: {'' if chat_stream.is_active else ''}",
])
"📝 消息内容:",
f" 原始文本: {message.processed_plain_text}",
f" 是否表情: {'' if getattr(message, 'is_emoji', False) else ''}",
]
)
# 聊天流信息
if hasattr(message, "chat_stream") and message.chat_stream:
chat_stream = message.chat_stream
info_lines.extend(
[
"",
"🔄 聊天流信息:",
f" 流ID: {chat_stream.stream_id}",
f" 是否激活: {'' if chat_stream.is_active else ''}",
]
)
return "\n".join(info_lines)
def _get_full_message_info(self) -> str:
"""获取完整的消息信息(包含技术细节)"""
message = self.message
info_lines = [
"📨 完整消息信息",
"=" * 40,
]
# 消息基础信息
info_lines.extend([
"",
"🔍 基础消息信息:",
f" 消息ID: {message.message_info.message_id}",
f" 时间戳: {message.message_info.time}",
f" 平台: {message.message_info.platform}",
f" 处理后文本: {message.processed_plain_text}",
f" 详细文本: {message.detailed_plain_text[:100]}{'...' if len(message.detailed_plain_text) > 100 else ''}",
])
info_lines.extend(
[
"",
"🔍 基础消息信息:",
f" 消息ID: {message.message_info.message_id}",
f" 时间戳: {message.message_info.time}",
f" 平台: {message.message_info.platform}",
f" 处理后文本: {message.processed_plain_text}",
f" 详细文本: {message.detailed_plain_text[:100]}{'...' if len(message.detailed_plain_text) > 100 else ''}",
]
)
# 用户详细信息
user = message.message_info.user_info
info_lines.extend([
"",
"👤 发送者详细信息:",
f" 用户ID: {user.user_id}",
f" 昵称: {user.user_nickname}",
f" 群名片: {user.user_cardname or ''}",
f" 平台: {user.platform}",
])
info_lines.extend(
[
"",
"👤 发送者详细信息:",
f" 用户ID: {user.user_id}",
f" 昵称: {user.user_nickname}",
f" 群名片: {user.user_cardname or ''}",
f" 平台: {user.platform}",
]
)
# 群聊详细信息
if message.message_info.group_info:
group = message.message_info.group_info
info_lines.extend([
"",
"👥 群聊详细信息:",
f" ID: {group.group_id}",
f": {group.group_name or '未知'}",
f" 平台: {group.platform}",
])
info_lines.extend(
[
"",
"👥聊详细信息:",
f"ID: {group.group_id}",
f" 群名: {group.group_name or '未知'}",
f" 平台: {group.platform}",
]
)
else:
info_lines.append("\n💬 消息类型: 私聊消息")
# 消息段信息
if message.message_segment:
info_lines.extend([
"",
"📦 消息段信息:",
f" 类型: {message.message_segment.type}",
f" 数据类型: {type(message.message_segment.data).__name__}",
f" 数据预览: {str(message.message_segment.data)[:200]}{'...' if len(str(message.message_segment.data)) > 200 else ''}",
])
info_lines.extend(
[
"",
"📦 消息段信息:",
f" 类型: {message.message_segment.type}",
f" 数据类型: {type(message.message_segment.data).__name__}",
f" 数据预览: {str(message.message_segment.data)[:200]}{'...' if len(str(message.message_segment.data)) > 200 else ''}",
]
)
# 聊天流详细信息
if hasattr(message, 'chat_stream') and message.chat_stream:
if hasattr(message, "chat_stream") and message.chat_stream:
chat_stream = message.chat_stream
info_lines.extend([
"",
"🔄 聊天流详细信息:",
f" 流ID: {chat_stream.stream_id}",
f" 平台: {chat_stream.platform}",
f" 是否激活: {'' if chat_stream.is_active else ''}",
f" 用户信息: {chat_stream.user_info.user_nickname} ({chat_stream.user_info.user_id})",
f" 信息: {getattr(chat_stream.group_info, 'group_name', '私聊') if chat_stream.group_info else '私聊'}",
])
info_lines.extend(
[
"",
"🔄 聊天流详细信息:",
f" 流ID: {chat_stream.stream_id}",
f" 平台: {chat_stream.platform}",
f" 是否激活: {'' if chat_stream.is_active else ''}",
f" 用户信息: {chat_stream.user_info.user_nickname} ({chat_stream.user_info.user_id})",
f" 群信息: {getattr(chat_stream.group_info, 'group_name', '私聊') if chat_stream.group_info else '私聊'}",
]
)
# 回复信息
if hasattr(message, 'reply') and message.reply:
info_lines.extend([
"",
"↩️ 回复信息:",
f" 回复消息ID: {message.reply.message_info.message_id}",
f" 回复内容: {message.reply.processed_plain_text[:100]}{'...' if len(message.reply.processed_plain_text) > 100 else ''}",
])
if hasattr(message, "reply") and message.reply:
info_lines.extend(
[
"",
"↩️ 回复信息:",
f" 回复消息ID: {message.reply.message_info.message_id}",
f" 回复内容: {message.reply.processed_plain_text[:100]}{'...' if len(message.reply.processed_plain_text) > 100 else ''}",
]
)
# 原始消息数据(如果存在)
if hasattr(message, 'raw_message') and message.raw_message:
info_lines.extend([
"",
"🗂️ 原始消息数据:",
f" 数据类型: {type(message.raw_message).__name__}",
f" 数据大小: {len(str(message.raw_message))} 字符",
])
if hasattr(message, "raw_message") and message.raw_message:
info_lines.extend(
[
"",
"🗂️ 原始消息数据:",
f" 数据类型: {type(message.raw_message).__name__}",
f" 数据大小: {len(str(message.raw_message))} 字符",
]
)
return "\n".join(info_lines)
@register_command
class SenderInfoCommand(BaseCommand):
"""发送者信息命令,快速查看发送者信息"""
command_name = "whoami"
command_description = "查看发送命令的用户信息"
command_pattern = r"^/whoami$"
command_help = "使用方法: /whoami - 查看你的用户信息"
command_examples = ["/whoami"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行发送者信息查看命令"""
try:
user = self.message.message_info.user_info
group = self.message.message_info.group_info
info_lines = [
"👤 你的身份信息",
f"🆔 用户ID: {user.user_id}",
@@ -203,19 +227,21 @@ class SenderInfoCommand(BaseCommand):
f"🏷️ 群名片: {user.user_cardname or ''}",
f"🌐 平台: {user.platform}",
]
if group:
info_lines.extend([
"",
"👥 当前群聊:",
f"🆔 群ID: {group.group_id}",
f"📝: {group.group_name or '未知'}",
])
info_lines.extend(
[
"",
"👥 当前群聊:",
f"🆔ID: {group.group_id}",
f"📝 群名: {group.group_name or '未知'}",
]
)
else:
info_lines.append("\n💬 当前在私聊中")
return True, "\n".join(info_lines)
except Exception as e:
logger.error(f"{self.log_prefix} 获取发送者信息时出错: {e}")
return False, f"获取发送者信息失败: {str(e)}"
@@ -224,59 +250,65 @@ class SenderInfoCommand(BaseCommand):
@register_command
class ChatStreamInfoCommand(BaseCommand):
"""聊天流信息命令"""
command_name = "streaminfo"
command_description = "查看当前聊天流的详细信息"
command_pattern = r"^/streaminfo$"
command_help = "使用方法: /streaminfo - 查看当前聊天流信息"
command_examples = ["/streaminfo"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行聊天流信息查看命令"""
try:
if not hasattr(self.message, 'chat_stream') or not self.message.chat_stream:
if not hasattr(self.message, "chat_stream") or not self.message.chat_stream:
return False, "无法获取聊天流信息"
chat_stream = self.message.chat_stream
info_lines = [
"🔄 聊天流信息",
f"🆔 流ID: {chat_stream.stream_id}",
f"🌐 平台: {chat_stream.platform}",
f"⚡ 状态: {'激活' if chat_stream.is_active else '非激活'}",
]
# 用户信息
if chat_stream.user_info:
info_lines.extend([
"",
"👤 关联用户:",
f" ID: {chat_stream.user_info.user_id}",
f" 昵称: {chat_stream.user_info.user_nickname}",
])
info_lines.extend(
[
"",
"👤 关联用户:",
f" ID: {chat_stream.user_info.user_id}",
f" 昵称: {chat_stream.user_info.user_nickname}",
]
)
# 群信息
if chat_stream.group_info:
info_lines.extend([
"",
"👥 关联群聊:",
f" 群ID: {chat_stream.group_info.group_id}",
f": {chat_stream.group_info.group_name or '未知'}",
])
info_lines.extend(
[
"",
"👥 关联群聊:",
f"ID: {chat_stream.group_info.group_id}",
f" 群名: {chat_stream.group_info.group_name or '未知'}",
]
)
else:
info_lines.append("\n💬 类型: 私聊流")
# 最近消息统计
if hasattr(chat_stream, 'last_messages'):
if hasattr(chat_stream, "last_messages"):
msg_count = len(chat_stream.last_messages)
info_lines.extend([
"",
f"📈 消息统计: 记录了 {msg_count} 条最近消息",
])
info_lines.extend(
[
"",
f"📈 消息统计: 记录了 {msg_count} 条最近消息",
]
)
return True, "\n".join(info_lines)
except Exception as e:
logger.error(f"{self.log_prefix} 获取聊天流信息时出错: {e}")
return False, f"获取聊天流信息失败: {str(e)}"
return False, f"获取聊天流信息失败: {str(e)}"

View File

@@ -5,43 +5,41 @@ from typing import Tuple, Optional
logger = get_logger("send_msg_command")
@register_command
class SendMessageCommand(BaseCommand, MessageAPI):
"""发送消息命令,可以向指定群聊或私聊发送消息"""
command_name = "send"
command_description = "向指定群聊或私聊发送消息"
command_pattern = r"^/send\s+(?P<target_type>group|user)\s+(?P<target_id>\d+)\s+(?P<content>.+)$"
command_help = "使用方法: /send <group|user> <ID> <消息内容> - 发送消息到指定群聊或用户"
command_examples = [
"/send group 123456789 大家好!",
"/send user 987654321 私聊消息"
]
command_examples = ["/send group 123456789 大家好!", "/send user 987654321 私聊消息"]
enable_command = True
def __init__(self, message):
super().__init__(message)
# 初始化MessageAPI需要的服务虽然这里不会用到但保持一致性
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行发送消息命令
Returns:
Tuple[bool, Optional[str]]: (是否执行成功, 回复消息)
"""
try:
# 获取匹配到的参数
target_type = self.matched_groups.get("target_type") # group 或 user
target_id = self.matched_groups.get("target_id") # 群ID或用户ID
content = self.matched_groups.get("content") # 消息内容
target_id = self.matched_groups.get("target_id") # 群ID或用户ID
content = self.matched_groups.get("content") # 消息内容
if not all([target_type, target_id, content]):
return False, "命令参数不完整,请检查格式"
logger.info(f"{self.log_prefix} 执行发送消息命令: {target_type}:{target_id} -> {content[:50]}...")
# 根据目标类型调用不同的发送方法
if target_type == "group":
success = await self._send_to_group(target_id, content)
@@ -51,24 +49,24 @@ class SendMessageCommand(BaseCommand, MessageAPI):
target_desc = f"用户 {target_id}"
else:
return False, f"不支持的目标类型: {target_type},只支持 group 或 user"
# 返回执行结果
if success:
return True, f"✅ 消息已成功发送到 {target_desc}"
else:
return False, f"❌ 消息发送失败,可能是目标 {target_desc} 不存在或没有权限"
except Exception as e:
logger.error(f"{self.log_prefix} 执行发送消息命令时出错: {e}")
return False, f"命令执行出错: {str(e)}"
async def _send_to_group(self, group_id: str, content: str) -> bool:
"""发送消息到群聊
Args:
group_id: 群聊ID
content: 消息内容
Returns:
bool: 是否发送成功
"""
@@ -76,27 +74,27 @@ class SendMessageCommand(BaseCommand, MessageAPI):
success = await self.send_text_to_group(
text=content,
group_id=group_id,
platform="qq" # 默认使用QQ平台
platform="qq", # 默认使用QQ平台
)
if success:
logger.info(f"{self.log_prefix} 成功发送消息到群聊 {group_id}")
else:
logger.warning(f"{self.log_prefix} 发送消息到群聊 {group_id} 失败")
return success
except Exception as e:
logger.error(f"{self.log_prefix} 发送群聊消息时出错: {e}")
return False
async def _send_to_user(self, user_id: str, content: str) -> bool:
"""发送消息到私聊
Args:
user_id: 用户ID
content: 消息内容
Returns:
bool: 是否发送成功
"""
@@ -104,16 +102,16 @@ class SendMessageCommand(BaseCommand, MessageAPI):
success = await self.send_text_to_user(
text=content,
user_id=user_id,
platform="qq" # 默认使用QQ平台
platform="qq", # 默认使用QQ平台
)
if success:
logger.info(f"{self.log_prefix} 成功发送消息到用户 {user_id}")
else:
logger.warning(f"{self.log_prefix} 发送消息到用户 {user_id} 失败")
return success
except Exception as e:
logger.error(f"{self.log_prefix} 发送私聊消息时出错: {e}")
return False
return False

View File

@@ -5,10 +5,11 @@ from typing import Tuple, Optional
logger = get_logger("send_msg_enhanced")
@register_command
class SendMessageEnhancedCommand(BaseCommand, MessageAPI):
"""增强版发送消息命令,支持多种消息类型和平台"""
command_name = "sendfull"
command_description = "增强版消息发送命令,支持多种类型和平台"
command_pattern = r"^/sendfull\s+(?P<msg_type>text|image|emoji)\s+(?P<target_type>group|user)\s+(?P<target_id>\d+)(?:\s+(?P<platform>\w+))?\s+(?P<content>.+)$"
@@ -17,108 +18,93 @@ class SendMessageEnhancedCommand(BaseCommand, MessageAPI):
"/sendfull text group 123456789 qq 大家好!这是文本消息",
"/sendfull image user 987654321 https://example.com/image.jpg",
"/sendfull emoji group 123456789 😄",
"/sendfull text user 987654321 qq 私聊消息"
"/sendfull text user 987654321 qq 私聊消息",
]
enable_command = True
def __init__(self, message):
super().__init__(message)
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行增强版发送消息命令"""
try:
# 获取匹配参数
msg_type = self.matched_groups.get("msg_type") # 消息类型: text/image/emoji
target_type = self.matched_groups.get("target_type") # 目标类型: group/user
target_id = self.matched_groups.get("target_id") # 目标ID
platform = self.matched_groups.get("platform") or "qq" # 平台默认qq
content = self.matched_groups.get("content") # 内容
msg_type = self.matched_groups.get("msg_type") # 消息类型: text/image/emoji
target_type = self.matched_groups.get("target_type") # 目标类型: group/user
target_id = self.matched_groups.get("target_id") # 目标ID
platform = self.matched_groups.get("platform") or "qq" # 平台默认qq
content = self.matched_groups.get("content") # 内容
if not all([msg_type, target_type, target_id, content]):
return False, "命令参数不完整,请检查格式"
# 验证消息类型
valid_types = ["text", "image", "emoji"]
if msg_type not in valid_types:
return False, f"不支持的消息类型: {msg_type},支持的类型: {', '.join(valid_types)}"
# 验证目标类型
if target_type not in ["group", "user"]:
return False, "目标类型只能是 group 或 user"
logger.info(f"{self.log_prefix} 执行发送命令: {msg_type} -> {target_type}:{target_id} (平台:{platform})")
# 根据消息类型和目标类型发送消息
is_group = (target_type == "group")
is_group = target_type == "group"
success = await self.send_message_to_target(
message_type=msg_type,
content=content,
platform=platform,
target_id=target_id,
is_group=is_group
message_type=msg_type, content=content, platform=platform, target_id=target_id, is_group=is_group
)
# 构建结果消息
target_desc = f"{'群聊' if is_group else '用户'} {target_id} (平台: {platform})"
msg_type_desc = {
"text": "文本",
"image": "图片",
"emoji": "表情"
}.get(msg_type, msg_type)
msg_type_desc = {"text": "文本", "image": "图片", "emoji": "表情"}.get(msg_type, msg_type)
if success:
return True, f"{msg_type_desc}消息已成功发送到 {target_desc}"
else:
return False, f"{msg_type_desc}消息发送失败,可能是目标 {target_desc} 不存在或没有权限"
except Exception as e:
logger.error(f"{self.log_prefix} 执行增强发送命令时出错: {e}")
return False, f"命令执行出错: {str(e)}"
@register_command
@register_command
class SendQuickCommand(BaseCommand, MessageAPI):
"""快速发送文本消息命令"""
command_name = "msg"
command_description = "快速发送文本消息到群聊"
command_pattern = r"^/msg\s+(?P<group_id>\d+)\s+(?P<content>.+)$"
command_help = "使用方法: /msg <群ID> <消息内容> - 快速发送文本到指定群聊"
command_examples = [
"/msg 123456789 大家好!",
"/msg 987654321 这是一条快速消息"
]
command_examples = ["/msg 123456789 大家好!", "/msg 987654321 这是一条快速消息"]
enable_command = True
def __init__(self, message):
super().__init__(message)
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行快速发送消息命令"""
try:
group_id = self.matched_groups.get("group_id")
content = self.matched_groups.get("content")
if not all([group_id, content]):
return False, "命令参数不完整"
logger.info(f"{self.log_prefix} 快速发送到群 {group_id}: {content[:50]}...")
success = await self.send_text_to_group(
text=content,
group_id=group_id,
platform="qq"
)
success = await self.send_text_to_group(text=content, group_id=group_id, platform="qq")
if success:
return True, f"✅ 消息已发送到群 {group_id}"
else:
return False, f"❌ 发送到群 {group_id} 失败"
except Exception as e:
logger.error(f"{self.log_prefix} 快速发送命令出错: {e}")
return False, f"发送失败: {str(e)}"
@@ -127,44 +113,37 @@ class SendQuickCommand(BaseCommand, MessageAPI):
@register_command
class SendPrivateCommand(BaseCommand, MessageAPI):
"""发送私聊消息命令"""
command_name = "pm"
command_description = "发送私聊消息到指定用户"
command_pattern = r"^/pm\s+(?P<user_id>\d+)\s+(?P<content>.+)$"
command_help = "使用方法: /pm <用户ID> <消息内容> - 发送私聊消息"
command_examples = [
"/pm 123456789 你好!",
"/pm 987654321 这是私聊消息"
]
command_examples = ["/pm 123456789 你好!", "/pm 987654321 这是私聊消息"]
enable_command = True
def __init__(self, message):
super().__init__(message)
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行私聊发送命令"""
try:
user_id = self.matched_groups.get("user_id")
content = self.matched_groups.get("content")
if not all([user_id, content]):
return False, "命令参数不完整"
logger.info(f"{self.log_prefix} 发送私聊到用户 {user_id}: {content[:50]}...")
success = await self.send_text_to_user(
text=content,
user_id=user_id,
platform="qq"
)
success = await self.send_text_to_user(text=content, user_id=user_id, platform="qq")
if success:
return True, f"✅ 私聊消息已发送到用户 {user_id}"
else:
return False, f"❌ 发送私聊到用户 {user_id} 失败"
except Exception as e:
logger.error(f"{self.log_prefix} 私聊发送命令出错: {e}")
return False, f"私聊发送失败: {str(e)}"
return False, f"私聊发送失败: {str(e)}"

View File

@@ -6,173 +6,153 @@ import time
logger = get_logger("send_msg_with_context")
@register_command
class ContextAwareSendCommand(BaseCommand, MessageAPI):
"""上下文感知的发送消息命令,展示如何利用原始消息信息"""
command_name = "csend"
command_description = "带上下文感知的发送消息命令"
command_pattern = r"^/csend\s+(?P<target_type>group|user|here|reply)\s+(?P<target_id_or_content>.*?)(?:\s+(?P<content>.*))?$"
command_pattern = (
r"^/csend\s+(?P<target_type>group|user|here|reply)\s+(?P<target_id_or_content>.*?)(?:\s+(?P<content>.*))?$"
)
command_help = "使用方法: /csend <target_type> <参数> [内容]"
command_examples = [
"/csend group 123456789 大家好!",
"/csend user 987654321 私聊消息",
"/csend user 987654321 私聊消息",
"/csend here 在当前聊天发送",
"/csend reply 回复当前群/私聊"
"/csend reply 回复当前群/私聊",
]
enable_command = True
# 管理员用户ID列表示例
ADMIN_USERS = ["123456789", "987654321"] # 可以从配置文件读取
def __init__(self, message):
super().__init__(message)
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行上下文感知的发送命令"""
try:
# 获取命令发送者信息
sender = self.message.message_info.user_info
current_group = self.message.message_info.group_info
# 权限检查
if not self._check_permission(sender.user_id):
return False, f"❌ 权限不足,只有管理员可以使用此命令\n你的ID: {sender.user_id}"
# 解析命令参数
target_type = self.matched_groups.get("target_type")
target_id_or_content = self.matched_groups.get("target_id_or_content", "")
content = self.matched_groups.get("content", "")
# 根据目标类型处理不同情况
if target_type == "here":
# 发送到当前聊天
return await self._send_to_current_chat(target_id_or_content, sender, current_group)
elif target_type == "reply":
# 回复到当前聊天,带发送者信息
return await self._send_reply_with_context(target_id_or_content, sender, current_group)
elif target_type in ["group", "user"]:
# 发送到指定目标
if not content:
return False, "指定群聊或用户时需要提供消息内容"
return await self._send_to_target(target_type, target_id_or_content, content, sender)
else:
return False, f"不支持的目标类型: {target_type}"
except Exception as e:
logger.error(f"{self.log_prefix} 执行上下文感知发送命令时出错: {e}")
return False, f"命令执行出错: {str(e)}"
def _check_permission(self, user_id: str) -> bool:
"""检查用户权限"""
return user_id in self.ADMIN_USERS
async def _send_to_current_chat(self, content: str, sender, current_group) -> Tuple[bool, str]:
"""发送到当前聊天"""
if not content:
return False, "消息内容不能为空"
# 构建带发送者信息的消息
timestamp = time.strftime("%H:%M:%S", time.localtime())
if current_group:
# 群聊
formatted_content = f"[管理员转发 {timestamp}] {sender.user_nickname}({sender.user_id}): {content}"
success = await self.send_text_to_group(
text=formatted_content,
group_id=current_group.group_id,
platform="qq"
text=formatted_content, group_id=current_group.group_id, platform="qq"
)
target_desc = f"当前群聊 {current_group.group_name}({current_group.group_id})"
else:
# 私聊
formatted_content = f"[管理员消息 {timestamp}]: {content}"
success = await self.send_text_to_user(
text=formatted_content,
user_id=sender.user_id,
platform="qq"
)
success = await self.send_text_to_user(text=formatted_content, user_id=sender.user_id, platform="qq")
target_desc = "当前私聊"
if success:
return True, f"✅ 消息已发送到{target_desc}"
else:
return False, f"❌ 发送到{target_desc}失败"
async def _send_reply_with_context(self, content: str, sender, current_group) -> Tuple[bool, str]:
"""发送回复,带完整上下文信息"""
if not content:
return False, "回复内容不能为空"
# 获取当前时间和环境信息
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 构建上下文信息
context_info = [
f"📢 管理员回复 [{timestamp}]",
f"👤 发送者: {sender.user_nickname}({sender.user_id})",
]
if current_group:
context_info.append(f"👥 当前群聊: {current_group.group_name}({current_group.group_id})")
target_desc = f"群聊 {current_group.group_name}"
else:
context_info.append("💬 当前环境: 私聊")
target_desc = "私聊"
context_info.extend([
f"📝 回复内容: {content}",
"" * 30
])
context_info.extend([f"📝 回复内容: {content}", "" * 30])
formatted_content = "\n".join(context_info)
# 发送消息
if current_group:
success = await self.send_text_to_group(
text=formatted_content,
group_id=current_group.group_id,
platform="qq"
text=formatted_content, group_id=current_group.group_id, platform="qq"
)
else:
success = await self.send_text_to_user(
text=formatted_content,
user_id=sender.user_id,
platform="qq"
)
success = await self.send_text_to_user(text=formatted_content, user_id=sender.user_id, platform="qq")
if success:
return True, f"✅ 带上下文的回复已发送到{target_desc}"
else:
return False, f"❌ 发送上下文回复到{target_desc}失败"
async def _send_to_target(self, target_type: str, target_id: str, content: str, sender) -> Tuple[bool, str]:
"""发送到指定目标,带发送者追踪信息"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 构建带追踪信息的消息
tracking_info = f"[管理转发 {timestamp}] 来自 {sender.user_nickname}({sender.user_id})"
formatted_content = f"{tracking_info}\n{content}"
if target_type == "group":
success = await self.send_text_to_group(
text=formatted_content,
group_id=target_id,
platform="qq"
)
success = await self.send_text_to_group(text=formatted_content, group_id=target_id, platform="qq")
target_desc = f"群聊 {target_id}"
else: # user
success = await self.send_text_to_user(
text=formatted_content,
user_id=target_id,
platform="qq"
)
success = await self.send_text_to_user(text=formatted_content, user_id=target_id, platform="qq")
target_desc = f"用户 {target_id}"
if success:
return True, f"✅ 带追踪信息的消息已发送到{target_desc}"
else:
@@ -182,21 +162,21 @@ class ContextAwareSendCommand(BaseCommand, MessageAPI):
@register_command
class MessageContextCommand(BaseCommand):
"""消息上下文命令,展示如何获取和利用上下文信息"""
command_name = "context"
command_description = "显示当前消息的完整上下文信息"
command_pattern = r"^/context$"
command_help = "使用方法: /context - 显示当前环境的上下文信息"
command_examples = ["/context"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""显示上下文信息"""
try:
message = self.message
user = message.message_info.user_info
group = message.message_info.group_info
# 构建上下文信息
context_lines = [
"🌐 当前上下文信息",
@@ -212,42 +192,50 @@ class MessageContextCommand(BaseCommand):
f" 群名片: {user.user_cardname or ''}",
f" 平台: {user.platform}",
]
if group:
context_lines.extend([
"",
"👥 群聊环境:",
f" ID: {group.group_id}",
f": {group.group_name or '未知'}",
f" 平台: {group.platform}",
])
context_lines.extend(
[
"",
"👥聊环境:",
f"ID: {group.group_id}",
f" 群名: {group.group_name or '未知'}",
f" 平台: {group.platform}",
]
)
else:
context_lines.extend([
"",
"💬 私聊环境",
])
context_lines.extend(
[
"",
"💬 私聊环境",
]
)
# 添加聊天流信息
if hasattr(message, 'chat_stream') and message.chat_stream:
if hasattr(message, "chat_stream") and message.chat_stream:
chat_stream = message.chat_stream
context_lines.extend([
"",
"🔄 聊天流:",
f" 流ID: {chat_stream.stream_id}",
f" 激活状态: {'激活' if chat_stream.is_active else '非激活'}",
])
context_lines.extend(
[
"",
"🔄 聊天流:",
f" 流ID: {chat_stream.stream_id}",
f" 激活状态: {'激活' if chat_stream.is_active else '非激活'}",
]
)
# 添加消息内容信息
context_lines.extend([
"",
"📝 消息内容:",
f" 原始内容: {message.processed_plain_text}",
f" 消息长度: {len(message.processed_plain_text)} 字符",
f" 消息ID: {message.message_info.message_id}",
])
context_lines.extend(
[
"",
"📝 消息内容:",
f" 原始内容: {message.processed_plain_text}",
f" 消息长度: {len(message.processed_plain_text)} 字符",
f" 消息ID: {message.message_info.message_id}",
]
)
return True, "\n".join(context_lines)
except Exception as e:
logger.error(f"{self.log_prefix} 获取上下文信息时出错: {e}")
return False, f"获取上下文失败: {str(e)}"
return False, f"获取上下文失败: {str(e)}"

View File

@@ -1,2 +1,3 @@
"""测试插件动作模块"""
from . import mute_action # noqa

View File

@@ -22,21 +22,20 @@ class MuteAction(PluginAction):
"当有人刷屏时使用",
"当有人发了擦边,或者色情内容时使用",
"当有人要求禁言自己时使用",
"如果某人已经被禁言了,就不要再次禁言了,除非你想追加时间!!"
"如果某人已经被禁言了,就不要再次禁言了,除非你想追加时间!!",
]
enable_plugin = False # 启用插件
associated_types = ["command", "text"]
action_config_file_name = "mute_action_config.toml"
# 激活类型设置
focus_activation_type = ActionActivationType.LLM_JUDGE # Focus模式使用LLM判定确保谨慎
normal_activation_type = ActionActivationType.KEYWORD # Normal模式使用关键词激活快速响应
normal_activation_type = ActionActivationType.KEYWORD # Normal模式使用关键词激活快速响应
# 关键词设置用于Normal模式
activation_keywords = ["禁言", "mute", "ban", "silence"]
keyword_case_sensitive = False
# LLM判定提示词用于Focus模式
llm_judge_prompt = """
判定是否需要使用禁言动作的严格条件:
@@ -59,13 +58,13 @@ class MuteAction(PluginAction):
注意:禁言是严厉措施,只在明确违规或用户主动要求时使用。
宁可保守也不要误判,保护用户的发言权利。
"""
# Random激活概率备用
random_activation_probability = 0.05 # 设置很低的概率作为兜底
# 模式启用设置 - 禁言功能在所有模式下都可用
mode_enable = ChatMode.ALL
# 并行执行设置 - 禁言动作可以与回复并行执行,不覆盖回复内容
parallel_action = False
@@ -73,15 +72,15 @@ class MuteAction(PluginAction):
super().__init__(*args, **kwargs)
# 生成配置文件(如果不存在)
self._generate_config_if_needed()
def _generate_config_if_needed(self):
"""生成配置文件(如果不存在)"""
import os
# 获取动作文件所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(current_dir, "mute_action_config.toml")
if not os.path.exists(config_path):
config_content = """\
# 禁言动作配置文件
@@ -130,11 +129,10 @@ log_mute_history = true
def _get_template_message(self, target: str, duration_str: str, reason: str) -> str:
"""获取模板化的禁言消息"""
templates = self.config.get("templates", [
"好的,禁言 {target} {duration},理由:{reason}"
])
templates = self.config.get("templates", ["好的,禁言 {target} {duration},理由:{reason}"])
import random
template = random.choice(templates)
return template.format(target=target, duration=duration_str, reason=reason)
@@ -162,7 +160,7 @@ log_mute_history = true
# 获取时长限制配置
min_duration, max_duration, default_duration = self._get_duration_limits()
# 验证时长格式并转换
try:
duration_int = int(duration)
@@ -170,9 +168,11 @@ log_mute_history = true
error_msg = "禁言时长必须大于0"
logger.error(f"{self.log_prefix} {error_msg}")
error_templates = self.config.get("error_messages", ["禁言时长必须是正数哦~"])
await self.send_message_by_expressor(error_templates[2] if len(error_templates) > 2 else "禁言时长必须是正数哦~")
await self.send_message_by_expressor(
error_templates[2] if len(error_templates) > 2 else "禁言时长必须是正数哦~"
)
return False, error_msg
# 限制禁言时长范围
if duration_int < min_duration:
duration_int = min_duration
@@ -180,12 +180,14 @@ log_mute_history = true
elif duration_int > max_duration:
duration_int = max_duration
logger.info(f"{self.log_prefix} 禁言时长过长,调整为{max_duration}")
except (ValueError, TypeError) as e:
except (ValueError, TypeError):
error_msg = f"禁言时长格式无效: {duration}"
logger.error(f"{self.log_prefix} {error_msg}")
error_templates = self.config.get("error_messages", ["禁言时长必须是数字哦~"])
await self.send_message_by_expressor(error_templates[3] if len(error_templates) > 3 else "禁言时长必须是数字哦~")
await self.send_message_by_expressor(
error_templates[3] if len(error_templates) > 3 else "禁言时长必须是数字哦~"
)
return False, error_msg
# 获取用户ID
@@ -206,7 +208,7 @@ log_mute_history = true
# 发送表达情绪的消息
enable_formatting = self.config.get("enable_duration_formatting", True)
time_str = self._format_duration(duration_int) if enable_formatting else f"{duration_int}"
# 使用模板化消息
message = self._get_template_message(target, time_str, reason)
await self.send_message_by_expressor(message)

View File

@@ -1,7 +1,7 @@
import importlib
import pkgutil
import os
from typing import Dict, List, Tuple
from typing import Dict, Tuple
from src.common.logger_manager import get_logger
logger = get_logger("plugin_loader")
@@ -9,52 +9,53 @@ logger = get_logger("plugin_loader")
class PluginLoader:
"""统一的插件加载器负责加载插件的所有组件actions、commands等"""
def __init__(self):
self.loaded_actions = 0
self.loaded_commands = 0
self.plugin_stats: Dict[str, Dict[str, int]] = {} # 统计每个插件加载的组件数量
self.plugin_sources: Dict[str, str] = {} # 记录每个插件来自哪个路径
def load_all_plugins(self) -> Tuple[int, int]:
"""加载所有插件的所有组件
Returns:
Tuple[int, int]: (加载的动作数量, 加载的命令数量)
"""
# 定义插件搜索路径(优先级从高到低)
plugin_paths = [
("plugins", "plugins"), # 项目根目录的plugins文件夹
("src.plugins", os.path.join("src", "plugins")) # src下的plugins文件夹
("src.plugins", os.path.join("src", "plugins")), # src下的plugins文件夹
]
total_plugins_found = 0
for plugin_import_path, plugin_dir_path in plugin_paths:
try:
plugins_loaded = self._load_plugins_from_path(plugin_import_path, plugin_dir_path)
total_plugins_found += plugins_loaded
except Exception as e:
logger.error(f"从路径 {plugin_dir_path} 加载插件失败: {e}")
import traceback
logger.error(traceback.format_exc())
if total_plugins_found == 0:
logger.info("未找到任何插件目录或插件")
# 输出加载统计
self._log_loading_stats()
return self.loaded_actions, self.loaded_commands
def _load_plugins_from_path(self, plugin_import_path: str, plugin_dir_path: str) -> int:
"""从指定路径加载插件
Args:
plugin_import_path: 插件的导入路径 (如 "plugins""src.plugins")
plugin_dir_path: 插件目录的文件系统路径
Returns:
int: 找到的插件包数量
"""
@@ -62,9 +63,9 @@ class PluginLoader:
if not os.path.exists(plugin_dir_path):
logger.debug(f"插件目录 {plugin_dir_path} 不存在,跳过")
return 0
logger.info(f"正在从 {plugin_dir_path} 加载插件...")
# 导入插件包
try:
plugins_package = importlib.import_module(plugin_import_path)
@@ -72,122 +73,120 @@ class PluginLoader:
except ImportError as e:
logger.warning(f"导入插件包 {plugin_import_path} 失败: {e}")
return 0
# 遍历插件包中的所有子包
plugins_found = 0
for _, plugin_name, is_pkg in pkgutil.iter_modules(
plugins_package.__path__, plugins_package.__name__ + "."
):
for _, plugin_name, is_pkg in pkgutil.iter_modules(plugins_package.__path__, plugins_package.__name__ + "."):
if not is_pkg:
continue
logger.debug(f"检测到插件: {plugin_name}")
# 记录插件来源
self.plugin_sources[plugin_name] = plugin_dir_path
self._load_single_plugin(plugin_name)
plugins_found += 1
if plugins_found > 0:
logger.info(f"{plugin_dir_path} 找到 {plugins_found} 个插件包")
else:
logger.debug(f"{plugin_dir_path} 未找到任何插件包")
return plugins_found
def _load_single_plugin(self, plugin_name: str) -> None:
"""加载单个插件的所有组件
Args:
plugin_name: 插件名称
"""
plugin_stats = {"actions": 0, "commands": 0}
# 加载动作组件
actions_count = self._load_plugin_actions(plugin_name)
plugin_stats["actions"] = actions_count
self.loaded_actions += actions_count
# 加载命令组件
# 加载命令组件
commands_count = self._load_plugin_commands(plugin_name)
plugin_stats["commands"] = commands_count
self.loaded_commands += commands_count
# 记录插件统计信息
if actions_count > 0 or commands_count > 0:
self.plugin_stats[plugin_name] = plugin_stats
logger.info(f"插件 {plugin_name} 加载完成: {actions_count} 个动作, {commands_count} 个命令")
def _load_plugin_actions(self, plugin_name: str) -> int:
"""加载插件的动作组件
Args:
plugin_name: 插件名称
Returns:
int: 加载的动作数量
"""
loaded_count = 0
# 优先检查插件是否有actions子包
plugin_actions_path = f"{plugin_name}.actions"
plugin_actions_dir = plugin_name.replace(".", os.path.sep) + os.path.sep + "actions"
actions_loaded_from_subdir = False
# 首先尝试从actions子目录加载
if os.path.exists(plugin_actions_dir):
loaded_count += self._load_from_actions_subdir(plugin_name, plugin_actions_path, plugin_actions_dir)
if loaded_count > 0:
actions_loaded_from_subdir = True
# 如果actions子目录不存在或加载失败尝试从插件根目录加载
if not actions_loaded_from_subdir:
loaded_count += self._load_actions_from_root_dir(plugin_name)
return loaded_count
def _load_plugin_commands(self, plugin_name: str) -> int:
"""加载插件的命令组件
Args:
plugin_name: 插件名称
Returns:
int: 加载的命令数量
"""
loaded_count = 0
# 优先检查插件是否有commands子包
plugin_commands_path = f"{plugin_name}.commands"
plugin_commands_dir = plugin_name.replace(".", os.path.sep) + os.path.sep + "commands"
commands_loaded_from_subdir = False
# 首先尝试从commands子目录加载
if os.path.exists(plugin_commands_dir):
loaded_count += self._load_from_commands_subdir(plugin_name, plugin_commands_path, plugin_commands_dir)
if loaded_count > 0:
commands_loaded_from_subdir = True
# 如果commands子目录不存在或加载失败尝试从插件根目录加载
if not commands_loaded_from_subdir:
loaded_count += self._load_commands_from_root_dir(plugin_name)
return loaded_count
def _load_from_actions_subdir(self, plugin_name: str, plugin_actions_path: str, plugin_actions_dir: str) -> int:
"""从actions子目录加载动作"""
loaded_count = 0
try:
# 尝试导入插件的actions包
actions_module = importlib.import_module(plugin_actions_path)
logger.debug(f"成功加载插件动作模块: {plugin_actions_path}")
# 遍历actions目录中的所有Python文件
actions_dir = os.path.dirname(actions_module.__file__)
for file in os.listdir(actions_dir):
if file.endswith('.py') and file != '__init__.py':
if file.endswith(".py") and file != "__init__.py":
action_module_name = f"{plugin_actions_path}.{file[:-3]}"
try:
importlib.import_module(action_module_name)
@@ -195,25 +194,25 @@ class PluginLoader:
loaded_count += 1
except Exception as e:
logger.error(f"加载动作失败: {action_module_name}, 错误: {e}")
except ImportError as e:
logger.debug(f"插件 {plugin_name} 的actions子包导入失败: {e}")
return loaded_count
def _load_from_commands_subdir(self, plugin_name: str, plugin_commands_path: str, plugin_commands_dir: str) -> int:
"""从commands子目录加载命令"""
loaded_count = 0
try:
# 尝试导入插件的commands包
commands_module = importlib.import_module(plugin_commands_path)
logger.debug(f"成功加载插件命令模块: {plugin_commands_path}")
# 遍历commands目录中的所有Python文件
commands_dir = os.path.dirname(commands_module.__file__)
for file in os.listdir(commands_dir):
if file.endswith('.py') and file != '__init__.py':
if file.endswith(".py") and file != "__init__.py":
command_module_name = f"{plugin_commands_path}.{file[:-3]}"
try:
importlib.import_module(command_module_name)
@@ -221,29 +220,29 @@ class PluginLoader:
loaded_count += 1
except Exception as e:
logger.error(f"加载命令失败: {command_module_name}, 错误: {e}")
except ImportError as e:
logger.debug(f"插件 {plugin_name} 的commands子包导入失败: {e}")
return loaded_count
def _load_actions_from_root_dir(self, plugin_name: str) -> int:
"""从插件根目录加载动作文件"""
loaded_count = 0
try:
# 导入插件包本身
plugin_module = importlib.import_module(plugin_name)
logger.debug(f"尝试从插件根目录加载动作: {plugin_name}")
# 遍历插件根目录中的所有Python文件
plugin_dir = os.path.dirname(plugin_module.__file__)
for file in os.listdir(plugin_dir):
if file.endswith('.py') and file != '__init__.py':
if file.endswith(".py") and file != "__init__.py":
# 跳过非动作文件(根据命名约定)
if not (file.endswith('_action.py') or file.endswith('_actions.py') or 'action' in file):
if not (file.endswith("_action.py") or file.endswith("_actions.py") or "action" in file):
continue
action_module_name = f"{plugin_name}.{file[:-3]}"
try:
importlib.import_module(action_module_name)
@@ -251,29 +250,29 @@ class PluginLoader:
loaded_count += 1
except Exception as e:
logger.error(f"加载动作失败: {action_module_name}, 错误: {e}")
except ImportError as e:
logger.debug(f"插件 {plugin_name} 导入失败: {e}")
return loaded_count
def _load_commands_from_root_dir(self, plugin_name: str) -> int:
"""从插件根目录加载命令文件"""
loaded_count = 0
try:
# 导入插件包本身
plugin_module = importlib.import_module(plugin_name)
logger.debug(f"尝试从插件根目录加载命令: {plugin_name}")
# 遍历插件根目录中的所有Python文件
plugin_dir = os.path.dirname(plugin_module.__file__)
for file in os.listdir(plugin_dir):
if file.endswith('.py') and file != '__init__.py':
if file.endswith(".py") and file != "__init__.py":
# 跳过非命令文件(根据命名约定)
if not (file.endswith('_command.py') or file.endswith('_commands.py') or 'command' in file):
if not (file.endswith("_command.py") or file.endswith("_commands.py") or "command" in file):
continue
command_module_name = f"{plugin_name}.{file[:-3]}"
try:
importlib.import_module(command_module_name)
@@ -281,23 +280,25 @@ class PluginLoader:
loaded_count += 1
except Exception as e:
logger.error(f"加载命令失败: {command_module_name}, 错误: {e}")
except ImportError as e:
logger.debug(f"插件 {plugin_name} 导入失败: {e}")
return loaded_count
def _log_loading_stats(self) -> None:
"""输出加载统计信息"""
logger.success(f"插件加载完成: 总计 {self.loaded_actions} 个动作, {self.loaded_commands} 个命令")
if self.plugin_stats:
logger.info("插件加载详情:")
for plugin_name, stats in self.plugin_stats.items():
plugin_display_name = plugin_name.split('.')[-1] # 只显示插件名称,不显示完整路径
plugin_display_name = plugin_name.split(".")[-1] # 只显示插件名称,不显示完整路径
source_path = self.plugin_sources.get(plugin_name, "未知路径")
logger.info(f" {plugin_display_name} (来源: {source_path}): {stats['actions']} 动作, {stats['commands']} 命令")
logger.info(
f" {plugin_display_name} (来源: {source_path}): {stats['actions']} 动作, {stats['commands']} 命令"
)
# 创建全局插件加载器实例
plugin_loader = PluginLoader()
plugin_loader = PluginLoader()

View File

@@ -23,14 +23,14 @@ class TTSAction(PluginAction):
]
enable_plugin = True # 启用插件
associated_types = ["tts_text"]
focus_activation_type = ActionActivationType.LLM_JUDGE
normal_activation_type = ActionActivationType.KEYWORD
# 关键词配置 - Normal模式下使用关键词触发
activation_keywords = ["语音", "tts", "播报", "读出来", "语音播放", "", "朗读"]
keyword_case_sensitive = False
# 并行执行设置 - TTS可以与回复并行执行不覆盖回复内容
parallel_action = False

View File

@@ -22,11 +22,11 @@ class VTBAction(PluginAction):
]
enable_plugin = True # 启用插件
associated_types = ["vtb_text"]
# 激活类型设置
focus_activation_type = ActionActivationType.LLM_JUDGE # Focus模式使用LLM判定精确识别情感表达需求
normal_activation_type = ActionActivationType.RANDOM # Normal模式使用随机激活增加趣味性
normal_activation_type = ActionActivationType.RANDOM # Normal模式使用随机激活增加趣味性
# LLM判定提示词用于Focus模式
llm_judge_prompt = """
判定是否需要使用VTB虚拟主播动作的条件
@@ -41,7 +41,7 @@ class VTBAction(PluginAction):
3. 不涉及情感的日常对话
4. 已经有足够的情感表达
"""
# Random激活概率用于Normal模式
random_activation_probability = 0.08 # 较低概率,避免过度使用