feat:统一加载插件,区分内部插件和外部插件,提供示例命令发送插件

This commit is contained in:
SengokuCola
2025-06-09 23:51:12 +08:00
parent 450dad2355
commit c44811815a
22 changed files with 2029 additions and 182 deletions

View File

@@ -35,7 +35,7 @@ class PicAction(PluginAction):
"当有人要求你生成并发送一张图片时使用",
"当有人让你画一张图时使用",
]
enable_plugin = True
enable_plugin = False
action_config_file_name = "pic_action_config.toml"
# 激活类型设置

View File

@@ -0,0 +1,105 @@
# 发送消息命令插件
这个插件提供了多个便捷的消息发送命令,允许管理员向指定群聊或用户发送消息。
## 命令列表
### 1. `/send` - 基础发送命令
向指定群聊或用户发送文本消息。
**语法:**
```
/send <group|user> <ID> <消息内容>
```
**示例:**
```
/send group 123456789 大家好!
/send user 987654321 私聊消息
```
### 2. `/sendfull` - 增强发送命令
支持多种消息类型和平台的发送命令。
**语法:**
```
/sendfull <消息类型> <目标类型> <ID> [平台] <内容>
```
**消息类型:**
- `text` - 文本消息
- `image` - 图片消息提供图片URL
- `emoji` - 表情消息
**示例:**
```
/sendfull text group 123456789 qq 大家好!这是文本消息
/sendfull image user 987654321 https://example.com/image.jpg
/sendfull emoji group 123456789 😄
```
### 3. `/msg` - 快速群聊发送
快速向群聊发送文本消息的简化命令。
**语法:**
```
/msg <群ID> <消息内容>
```
**示例:**
```
/msg 123456789 大家好!
/msg 987654321 这是一条快速消息
```
### 4. `/pm` - 私聊发送
快速向用户发送私聊消息的命令。
**语法:**
```
/pm <用户ID> <消息内容>
```
**示例:**
```
/pm 123456789 你好!
/pm 987654321 这是私聊消息
```
## 使用前提
1. **目标存在**: 目标群聊或用户必须已经在机器人的数据库中存在对应的chat_stream记录
2. **权限要求**: 机器人必须在目标群聊中有发言权限
3. **管理员权限**: 这些命令通常需要管理员权限才能使用
## 错误处理
如果消息发送失败,可能的原因:
1. **目标不存在**: 指定的群ID或用户ID在数据库中找不到对应记录
2. **权限不足**: 机器人在目标群聊中没有发言权限
3. **网络问题**: 网络连接异常
4. **平台限制**: 目标平台的API限制
## 注意事项
1. **ID格式**: 群ID和用户ID必须是纯数字
2. **消息长度**: 注意平台对消息长度的限制
3. **图片格式**: 发送图片时需要提供有效的图片URL
4. **平台支持**: 目前主要支持QQ平台其他平台可能需要额外配置
## 安全建议
1. 限制这些命令的使用权限,避免滥用
2. 监控发送频率,防止刷屏
3. 定期检查发送日志,确保合规使用
## 故障排除
查看日志文件中的详细错误信息:
```
[INFO] [Command:send] 执行发送消息命令: group:123456789 -> 大家好!...
[ERROR] [Command:send] 发送群聊消息时出错: 未找到群ID为 123456789 的聊天流
```
根据错误信息进行相应的处理。

View File

@@ -0,0 +1,282 @@
from src.common.logger_manager import get_logger
from src.chat.message_receive.command_handler import BaseCommand, register_command
from typing import Tuple, Optional
import json
logger = get_logger("message_info_command")
@register_command
class MessageInfoCommand(BaseCommand):
"""消息信息查看命令,展示发送命令的原始消息和相关信息"""
command_name = "msginfo"
command_description = "查看发送命令的原始消息信息"
command_pattern = r"^/msginfo(?:\s+(?P<detail>full|simple))?$"
command_help = "使用方法: /msginfo [full|simple] - 查看当前消息的详细信息"
command_examples = ["/msginfo", "/msginfo full", "/msginfo simple"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行消息信息查看命令"""
try:
detail_level = self.matched_groups.get("detail", "simple")
logger.info(f"{self.log_prefix} 查看消息信息,详细级别: {detail_level}")
if detail_level == "full":
info_text = self._get_full_message_info()
else:
info_text = self._get_simple_message_info()
return True, info_text
except Exception as e:
logger.error(f"{self.log_prefix} 获取消息信息时出错: {e}")
return False, f"获取消息信息失败: {str(e)}"
def _get_simple_message_info(self) -> str:
"""获取简化的消息信息"""
message = self.message
# 基础信息
info_lines = [
"📨 消息信息概览",
f"🆔 消息ID: {message.message_info.message_id}",
f"⏰ 时间: {message.message_info.time}",
f"🌐 平台: {message.message_info.platform}",
]
# 发送者信息
user = message.message_info.user_info
info_lines.extend([
"",
"👤 发送者信息:",
f" 用户ID: {user.user_id}",
f" 昵称: {user.user_nickname}",
f" 群名片: {user.user_cardname or ''}",
])
# 群聊信息(如果是群聊)
if message.message_info.group_info:
group = message.message_info.group_info
info_lines.extend([
"",
"👥 群聊信息:",
f" 群ID: {group.group_id}",
f" 群名: {group.group_name or '未知'}",
])
else:
info_lines.extend([
"",
"💬 消息类型: 私聊消息",
])
# 消息内容
info_lines.extend([
"",
"📝 消息内容:",
f" 原始文本: {message.processed_plain_text}",
f" 是否表情: {'' if getattr(message, 'is_emoji', False) else ''}",
])
# 聊天流信息
if hasattr(message, 'chat_stream') and message.chat_stream:
chat_stream = message.chat_stream
info_lines.extend([
"",
"🔄 聊天流信息:",
f" 流ID: {chat_stream.stream_id}",
f" 是否激活: {'' if chat_stream.is_active else ''}",
])
return "\n".join(info_lines)
def _get_full_message_info(self) -> str:
"""获取完整的消息信息(包含技术细节)"""
message = self.message
info_lines = [
"📨 完整消息信息",
"=" * 40,
]
# 消息基础信息
info_lines.extend([
"",
"🔍 基础消息信息:",
f" 消息ID: {message.message_info.message_id}",
f" 时间戳: {message.message_info.time}",
f" 平台: {message.message_info.platform}",
f" 处理后文本: {message.processed_plain_text}",
f" 详细文本: {message.detailed_plain_text[:100]}{'...' if len(message.detailed_plain_text) > 100 else ''}",
])
# 用户详细信息
user = message.message_info.user_info
info_lines.extend([
"",
"👤 发送者详细信息:",
f" 用户ID: {user.user_id}",
f" 昵称: {user.user_nickname}",
f" 群名片: {user.user_cardname or ''}",
f" 平台: {user.platform}",
])
# 群聊详细信息
if message.message_info.group_info:
group = message.message_info.group_info
info_lines.extend([
"",
"👥 群聊详细信息:",
f" 群ID: {group.group_id}",
f" 群名: {group.group_name or '未知'}",
f" 平台: {group.platform}",
])
else:
info_lines.append("\n💬 消息类型: 私聊消息")
# 消息段信息
if message.message_segment:
info_lines.extend([
"",
"📦 消息段信息:",
f" 类型: {message.message_segment.type}",
f" 数据类型: {type(message.message_segment.data).__name__}",
f" 数据预览: {str(message.message_segment.data)[:200]}{'...' if len(str(message.message_segment.data)) > 200 else ''}",
])
# 聊天流详细信息
if hasattr(message, 'chat_stream') and message.chat_stream:
chat_stream = message.chat_stream
info_lines.extend([
"",
"🔄 聊天流详细信息:",
f" 流ID: {chat_stream.stream_id}",
f" 平台: {chat_stream.platform}",
f" 是否激活: {'' if chat_stream.is_active else ''}",
f" 用户信息: {chat_stream.user_info.user_nickname} ({chat_stream.user_info.user_id})",
f" 群信息: {getattr(chat_stream.group_info, 'group_name', '私聊') if chat_stream.group_info else '私聊'}",
])
# 回复信息
if hasattr(message, 'reply') and message.reply:
info_lines.extend([
"",
"↩️ 回复信息:",
f" 回复消息ID: {message.reply.message_info.message_id}",
f" 回复内容: {message.reply.processed_plain_text[:100]}{'...' if len(message.reply.processed_plain_text) > 100 else ''}",
])
# 原始消息数据(如果存在)
if hasattr(message, 'raw_message') and message.raw_message:
info_lines.extend([
"",
"🗂️ 原始消息数据:",
f" 数据类型: {type(message.raw_message).__name__}",
f" 数据大小: {len(str(message.raw_message))} 字符",
])
return "\n".join(info_lines)
@register_command
class SenderInfoCommand(BaseCommand):
"""发送者信息命令,快速查看发送者信息"""
command_name = "whoami"
command_description = "查看发送命令的用户信息"
command_pattern = r"^/whoami$"
command_help = "使用方法: /whoami - 查看你的用户信息"
command_examples = ["/whoami"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行发送者信息查看命令"""
try:
user = self.message.message_info.user_info
group = self.message.message_info.group_info
info_lines = [
"👤 你的身份信息",
f"🆔 用户ID: {user.user_id}",
f"📝 昵称: {user.user_nickname}",
f"🏷️ 群名片: {user.user_cardname or ''}",
f"🌐 平台: {user.platform}",
]
if group:
info_lines.extend([
"",
"👥 当前群聊:",
f"🆔 群ID: {group.group_id}",
f"📝 群名: {group.group_name or '未知'}",
])
else:
info_lines.append("\n💬 当前在私聊中")
return True, "\n".join(info_lines)
except Exception as e:
logger.error(f"{self.log_prefix} 获取发送者信息时出错: {e}")
return False, f"获取发送者信息失败: {str(e)}"
@register_command
class ChatStreamInfoCommand(BaseCommand):
"""聊天流信息命令"""
command_name = "streaminfo"
command_description = "查看当前聊天流的详细信息"
command_pattern = r"^/streaminfo$"
command_help = "使用方法: /streaminfo - 查看当前聊天流信息"
command_examples = ["/streaminfo"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行聊天流信息查看命令"""
try:
if not hasattr(self.message, 'chat_stream') or not self.message.chat_stream:
return False, "无法获取聊天流信息"
chat_stream = self.message.chat_stream
info_lines = [
"🔄 聊天流信息",
f"🆔 流ID: {chat_stream.stream_id}",
f"🌐 平台: {chat_stream.platform}",
f"⚡ 状态: {'激活' if chat_stream.is_active else '非激活'}",
]
# 用户信息
if chat_stream.user_info:
info_lines.extend([
"",
"👤 关联用户:",
f" ID: {chat_stream.user_info.user_id}",
f" 昵称: {chat_stream.user_info.user_nickname}",
])
# 群信息
if chat_stream.group_info:
info_lines.extend([
"",
"👥 关联群聊:",
f" 群ID: {chat_stream.group_info.group_id}",
f" 群名: {chat_stream.group_info.group_name or '未知'}",
])
else:
info_lines.append("\n💬 类型: 私聊流")
# 最近消息统计
if hasattr(chat_stream, 'last_messages'):
msg_count = len(chat_stream.last_messages)
info_lines.extend([
"",
f"📈 消息统计: 记录了 {msg_count} 条最近消息",
])
return True, "\n".join(info_lines)
except Exception as e:
logger.error(f"{self.log_prefix} 获取聊天流信息时出错: {e}")
return False, f"获取聊天流信息失败: {str(e)}"

View File

@@ -0,0 +1,119 @@
from src.common.logger_manager import get_logger
from src.chat.message_receive.command_handler import BaseCommand, register_command
from src.chat.actions.plugin_api.message_api import MessageAPI
from typing import Tuple, Optional
logger = get_logger("send_msg_command")
@register_command
class SendMessageCommand(BaseCommand, MessageAPI):
"""发送消息命令,可以向指定群聊或私聊发送消息"""
command_name = "send"
command_description = "向指定群聊或私聊发送消息"
command_pattern = r"^/send\s+(?P<target_type>group|user)\s+(?P<target_id>\d+)\s+(?P<content>.+)$"
command_help = "使用方法: /send <group|user> <ID> <消息内容> - 发送消息到指定群聊或用户"
command_examples = [
"/send group 123456789 大家好!",
"/send user 987654321 私聊消息"
]
enable_command = True
def __init__(self, message):
super().__init__(message)
# 初始化MessageAPI需要的服务虽然这里不会用到但保持一致性
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行发送消息命令
Returns:
Tuple[bool, Optional[str]]: (是否执行成功, 回复消息)
"""
try:
# 获取匹配到的参数
target_type = self.matched_groups.get("target_type") # group 或 user
target_id = self.matched_groups.get("target_id") # 群ID或用户ID
content = self.matched_groups.get("content") # 消息内容
if not all([target_type, target_id, content]):
return False, "命令参数不完整,请检查格式"
logger.info(f"{self.log_prefix} 执行发送消息命令: {target_type}:{target_id} -> {content[:50]}...")
# 根据目标类型调用不同的发送方法
if target_type == "group":
success = await self._send_to_group(target_id, content)
target_desc = f"群聊 {target_id}"
elif target_type == "user":
success = await self._send_to_user(target_id, content)
target_desc = f"用户 {target_id}"
else:
return False, f"不支持的目标类型: {target_type},只支持 group 或 user"
# 返回执行结果
if success:
return True, f"✅ 消息已成功发送到 {target_desc}"
else:
return False, f"❌ 消息发送失败,可能是目标 {target_desc} 不存在或没有权限"
except Exception as e:
logger.error(f"{self.log_prefix} 执行发送消息命令时出错: {e}")
return False, f"命令执行出错: {str(e)}"
async def _send_to_group(self, group_id: str, content: str) -> bool:
"""发送消息到群聊
Args:
group_id: 群聊ID
content: 消息内容
Returns:
bool: 是否发送成功
"""
try:
success = await self.send_text_to_group(
text=content,
group_id=group_id,
platform="qq" # 默认使用QQ平台
)
if success:
logger.info(f"{self.log_prefix} 成功发送消息到群聊 {group_id}")
else:
logger.warning(f"{self.log_prefix} 发送消息到群聊 {group_id} 失败")
return success
except Exception as e:
logger.error(f"{self.log_prefix} 发送群聊消息时出错: {e}")
return False
async def _send_to_user(self, user_id: str, content: str) -> bool:
"""发送消息到私聊
Args:
user_id: 用户ID
content: 消息内容
Returns:
bool: 是否发送成功
"""
try:
success = await self.send_text_to_user(
text=content,
user_id=user_id,
platform="qq" # 默认使用QQ平台
)
if success:
logger.info(f"{self.log_prefix} 成功发送消息到用户 {user_id}")
else:
logger.warning(f"{self.log_prefix} 发送消息到用户 {user_id} 失败")
return success
except Exception as e:
logger.error(f"{self.log_prefix} 发送私聊消息时出错: {e}")
return False

View File

@@ -0,0 +1,170 @@
from src.common.logger_manager import get_logger
from src.chat.message_receive.command_handler import BaseCommand, register_command
from src.chat.actions.plugin_api.message_api import MessageAPI
from typing import Tuple, Optional
logger = get_logger("send_msg_enhanced")
@register_command
class SendMessageEnhancedCommand(BaseCommand, MessageAPI):
"""增强版发送消息命令,支持多种消息类型和平台"""
command_name = "sendfull"
command_description = "增强版消息发送命令,支持多种类型和平台"
command_pattern = r"^/sendfull\s+(?P<msg_type>text|image|emoji)\s+(?P<target_type>group|user)\s+(?P<target_id>\d+)(?:\s+(?P<platform>\w+))?\s+(?P<content>.+)$"
command_help = "使用方法: /sendfull <消息类型> <目标类型> <ID> [平台] <内容>"
command_examples = [
"/sendfull text group 123456789 qq 大家好!这是文本消息",
"/sendfull image user 987654321 https://example.com/image.jpg",
"/sendfull emoji group 123456789 😄",
"/sendfull text user 987654321 qq 私聊消息"
]
enable_command = True
def __init__(self, message):
super().__init__(message)
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行增强版发送消息命令"""
try:
# 获取匹配参数
msg_type = self.matched_groups.get("msg_type") # 消息类型: text/image/emoji
target_type = self.matched_groups.get("target_type") # 目标类型: group/user
target_id = self.matched_groups.get("target_id") # 目标ID
platform = self.matched_groups.get("platform") or "qq" # 平台默认qq
content = self.matched_groups.get("content") # 内容
if not all([msg_type, target_type, target_id, content]):
return False, "命令参数不完整,请检查格式"
# 验证消息类型
valid_types = ["text", "image", "emoji"]
if msg_type not in valid_types:
return False, f"不支持的消息类型: {msg_type},支持的类型: {', '.join(valid_types)}"
# 验证目标类型
if target_type not in ["group", "user"]:
return False, "目标类型只能是 group 或 user"
logger.info(f"{self.log_prefix} 执行发送命令: {msg_type} -> {target_type}:{target_id} (平台:{platform})")
# 根据消息类型和目标类型发送消息
is_group = (target_type == "group")
success = await self.send_message_to_target(
message_type=msg_type,
content=content,
platform=platform,
target_id=target_id,
is_group=is_group
)
# 构建结果消息
target_desc = f"{'群聊' if is_group else '用户'} {target_id} (平台: {platform})"
msg_type_desc = {
"text": "文本",
"image": "图片",
"emoji": "表情"
}.get(msg_type, msg_type)
if success:
return True, f"{msg_type_desc}消息已成功发送到 {target_desc}"
else:
return False, f"{msg_type_desc}消息发送失败,可能是目标 {target_desc} 不存在或没有权限"
except Exception as e:
logger.error(f"{self.log_prefix} 执行增强发送命令时出错: {e}")
return False, f"命令执行出错: {str(e)}"
@register_command
class SendQuickCommand(BaseCommand, MessageAPI):
"""快速发送文本消息命令"""
command_name = "msg"
command_description = "快速发送文本消息到群聊"
command_pattern = r"^/msg\s+(?P<group_id>\d+)\s+(?P<content>.+)$"
command_help = "使用方法: /msg <群ID> <消息内容> - 快速发送文本到指定群聊"
command_examples = [
"/msg 123456789 大家好!",
"/msg 987654321 这是一条快速消息"
]
enable_command = True
def __init__(self, message):
super().__init__(message)
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行快速发送消息命令"""
try:
group_id = self.matched_groups.get("group_id")
content = self.matched_groups.get("content")
if not all([group_id, content]):
return False, "命令参数不完整"
logger.info(f"{self.log_prefix} 快速发送到群 {group_id}: {content[:50]}...")
success = await self.send_text_to_group(
text=content,
group_id=group_id,
platform="qq"
)
if success:
return True, f"✅ 消息已发送到群 {group_id}"
else:
return False, f"❌ 发送到群 {group_id} 失败"
except Exception as e:
logger.error(f"{self.log_prefix} 快速发送命令出错: {e}")
return False, f"发送失败: {str(e)}"
@register_command
class SendPrivateCommand(BaseCommand, MessageAPI):
"""发送私聊消息命令"""
command_name = "pm"
command_description = "发送私聊消息到指定用户"
command_pattern = r"^/pm\s+(?P<user_id>\d+)\s+(?P<content>.+)$"
command_help = "使用方法: /pm <用户ID> <消息内容> - 发送私聊消息"
command_examples = [
"/pm 123456789 你好!",
"/pm 987654321 这是私聊消息"
]
enable_command = True
def __init__(self, message):
super().__init__(message)
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行私聊发送命令"""
try:
user_id = self.matched_groups.get("user_id")
content = self.matched_groups.get("content")
if not all([user_id, content]):
return False, "命令参数不完整"
logger.info(f"{self.log_prefix} 发送私聊到用户 {user_id}: {content[:50]}...")
success = await self.send_text_to_user(
text=content,
user_id=user_id,
platform="qq"
)
if success:
return True, f"✅ 私聊消息已发送到用户 {user_id}"
else:
return False, f"❌ 发送私聊到用户 {user_id} 失败"
except Exception as e:
logger.error(f"{self.log_prefix} 私聊发送命令出错: {e}")
return False, f"私聊发送失败: {str(e)}"

View File

@@ -0,0 +1,253 @@
from src.common.logger_manager import get_logger
from src.chat.message_receive.command_handler import BaseCommand, register_command
from src.chat.actions.plugin_api.message_api import MessageAPI
from typing import Tuple, Optional
import time
logger = get_logger("send_msg_with_context")
@register_command
class ContextAwareSendCommand(BaseCommand, MessageAPI):
"""上下文感知的发送消息命令,展示如何利用原始消息信息"""
command_name = "csend"
command_description = "带上下文感知的发送消息命令"
command_pattern = r"^/csend\s+(?P<target_type>group|user|here|reply)\s+(?P<target_id_or_content>.*?)(?:\s+(?P<content>.*))?$"
command_help = "使用方法: /csend <target_type> <参数> [内容]"
command_examples = [
"/csend group 123456789 大家好!",
"/csend user 987654321 私聊消息",
"/csend here 在当前聊天发送",
"/csend reply 回复当前群/私聊"
]
enable_command = True
# 管理员用户ID列表示例
ADMIN_USERS = ["123456789", "987654321"] # 可以从配置文件读取
def __init__(self, message):
super().__init__(message)
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行上下文感知的发送命令"""
try:
# 获取命令发送者信息
sender = self.message.message_info.user_info
current_group = self.message.message_info.group_info
# 权限检查
if not self._check_permission(sender.user_id):
return False, f"❌ 权限不足,只有管理员可以使用此命令\n你的ID: {sender.user_id}"
# 解析命令参数
target_type = self.matched_groups.get("target_type")
target_id_or_content = self.matched_groups.get("target_id_or_content", "")
content = self.matched_groups.get("content", "")
# 根据目标类型处理不同情况
if target_type == "here":
# 发送到当前聊天
return await self._send_to_current_chat(target_id_or_content, sender, current_group)
elif target_type == "reply":
# 回复到当前聊天,带发送者信息
return await self._send_reply_with_context(target_id_or_content, sender, current_group)
elif target_type in ["group", "user"]:
# 发送到指定目标
if not content:
return False, "指定群聊或用户时需要提供消息内容"
return await self._send_to_target(target_type, target_id_or_content, content, sender)
else:
return False, f"不支持的目标类型: {target_type}"
except Exception as e:
logger.error(f"{self.log_prefix} 执行上下文感知发送命令时出错: {e}")
return False, f"命令执行出错: {str(e)}"
def _check_permission(self, user_id: str) -> bool:
"""检查用户权限"""
return user_id in self.ADMIN_USERS
async def _send_to_current_chat(self, content: str, sender, current_group) -> Tuple[bool, str]:
"""发送到当前聊天"""
if not content:
return False, "消息内容不能为空"
# 构建带发送者信息的消息
timestamp = time.strftime("%H:%M:%S", time.localtime())
if current_group:
# 群聊
formatted_content = f"[管理员转发 {timestamp}] {sender.user_nickname}({sender.user_id}): {content}"
success = await self.send_text_to_group(
text=formatted_content,
group_id=current_group.group_id,
platform="qq"
)
target_desc = f"当前群聊 {current_group.group_name}({current_group.group_id})"
else:
# 私聊
formatted_content = f"[管理员消息 {timestamp}]: {content}"
success = await self.send_text_to_user(
text=formatted_content,
user_id=sender.user_id,
platform="qq"
)
target_desc = "当前私聊"
if success:
return True, f"✅ 消息已发送到{target_desc}"
else:
return False, f"❌ 发送到{target_desc}失败"
async def _send_reply_with_context(self, content: str, sender, current_group) -> Tuple[bool, str]:
"""发送回复,带完整上下文信息"""
if not content:
return False, "回复内容不能为空"
# 获取当前时间和环境信息
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 构建上下文信息
context_info = [
f"📢 管理员回复 [{timestamp}]",
f"👤 发送者: {sender.user_nickname}({sender.user_id})",
]
if current_group:
context_info.append(f"👥 当前群聊: {current_group.group_name}({current_group.group_id})")
target_desc = f"群聊 {current_group.group_name}"
else:
context_info.append("💬 当前环境: 私聊")
target_desc = "私聊"
context_info.extend([
f"📝 回复内容: {content}",
"" * 30
])
formatted_content = "\n".join(context_info)
# 发送消息
if current_group:
success = await self.send_text_to_group(
text=formatted_content,
group_id=current_group.group_id,
platform="qq"
)
else:
success = await self.send_text_to_user(
text=formatted_content,
user_id=sender.user_id,
platform="qq"
)
if success:
return True, f"✅ 带上下文的回复已发送到{target_desc}"
else:
return False, f"❌ 发送上下文回复到{target_desc}失败"
async def _send_to_target(self, target_type: str, target_id: str, content: str, sender) -> Tuple[bool, str]:
"""发送到指定目标,带发送者追踪信息"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 构建带追踪信息的消息
tracking_info = f"[管理转发 {timestamp}] 来自 {sender.user_nickname}({sender.user_id})"
formatted_content = f"{tracking_info}\n{content}"
if target_type == "group":
success = await self.send_text_to_group(
text=formatted_content,
group_id=target_id,
platform="qq"
)
target_desc = f"群聊 {target_id}"
else: # user
success = await self.send_text_to_user(
text=formatted_content,
user_id=target_id,
platform="qq"
)
target_desc = f"用户 {target_id}"
if success:
return True, f"✅ 带追踪信息的消息已发送到{target_desc}"
else:
return False, f"❌ 发送到{target_desc}失败"
@register_command
class MessageContextCommand(BaseCommand):
"""消息上下文命令,展示如何获取和利用上下文信息"""
command_name = "context"
command_description = "显示当前消息的完整上下文信息"
command_pattern = r"^/context$"
command_help = "使用方法: /context - 显示当前环境的上下文信息"
command_examples = ["/context"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""显示上下文信息"""
try:
message = self.message
user = message.message_info.user_info
group = message.message_info.group_info
# 构建上下文信息
context_lines = [
"🌐 当前上下文信息",
"=" * 30,
"",
"⏰ 时间信息:",
f" 消息时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(message.message_info.time))}",
f" 时间戳: {message.message_info.time}",
"",
"👤 发送者:",
f" 用户ID: {user.user_id}",
f" 昵称: {user.user_nickname}",
f" 群名片: {user.user_cardname or ''}",
f" 平台: {user.platform}",
]
if group:
context_lines.extend([
"",
"👥 群聊环境:",
f" 群ID: {group.group_id}",
f" 群名: {group.group_name or '未知'}",
f" 平台: {group.platform}",
])
else:
context_lines.extend([
"",
"💬 私聊环境",
])
# 添加聊天流信息
if hasattr(message, 'chat_stream') and message.chat_stream:
chat_stream = message.chat_stream
context_lines.extend([
"",
"🔄 聊天流:",
f" 流ID: {chat_stream.stream_id}",
f" 激活状态: {'激活' if chat_stream.is_active else '非激活'}",
])
# 添加消息内容信息
context_lines.extend([
"",
"📝 消息内容:",
f" 原始内容: {message.processed_plain_text}",
f" 消息长度: {len(message.processed_plain_text)} 字符",
f" 消息ID: {message.message_info.message_id}",
])
return True, "\n".join(context_lines)
except Exception as e:
logger.error(f"{self.log_prefix} 获取上下文信息时出错: {e}")
return False, f"获取上下文失败: {str(e)}"

View File

@@ -1,36 +0,0 @@
from src.common.logger_manager import get_logger
from src.chat.message_receive.command_handler import BaseCommand, register_command
from typing import Tuple, Optional
logger = get_logger("echo_command")
@register_command
class EchoCommand(BaseCommand):
"""回显命令,将用户输入的内容回显"""
command_name = "echo"
command_description = "回显命令,将用户输入的内容回显"
command_pattern = r"^/echo\s+(?P<content>.+)$" # 匹配 /echo 后面的所有内容
command_help = "使用方法: /echo <内容> - 回显你输入的内容"
command_examples = ["/echo 你好,世界!", "/echo 这是一个测试"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行回显命令
Returns:
Tuple[bool, Optional[str]]: (是否执行成功, 回复消息)
"""
try:
# 获取匹配到的内容
content = self.matched_groups.get("content")
if not content:
return False, "请提供要回显的内容"
logger.info(f"{self.log_prefix} 执行回显命令: {content}")
return True, f"🔄 {content}"
except Exception as e:
logger.error(f"{self.log_prefix} 执行回显命令时出错: {e}")
return False, f"执行命令时出错: {str(e)}"

View File

@@ -0,0 +1,303 @@
import importlib
import pkgutil
import os
from typing import Dict, List, Tuple
from src.common.logger_manager import get_logger
logger = get_logger("plugin_loader")
class PluginLoader:
"""统一的插件加载器负责加载插件的所有组件actions、commands等"""
def __init__(self):
self.loaded_actions = 0
self.loaded_commands = 0
self.plugin_stats: Dict[str, Dict[str, int]] = {} # 统计每个插件加载的组件数量
self.plugin_sources: Dict[str, str] = {} # 记录每个插件来自哪个路径
def load_all_plugins(self) -> Tuple[int, int]:
"""加载所有插件的所有组件
Returns:
Tuple[int, int]: (加载的动作数量, 加载的命令数量)
"""
# 定义插件搜索路径(优先级从高到低)
plugin_paths = [
("plugins", "plugins"), # 项目根目录的plugins文件夹
("src.plugins", os.path.join("src", "plugins")) # src下的plugins文件夹
]
total_plugins_found = 0
for plugin_import_path, plugin_dir_path in plugin_paths:
try:
plugins_loaded = self._load_plugins_from_path(plugin_import_path, plugin_dir_path)
total_plugins_found += plugins_loaded
except Exception as e:
logger.error(f"从路径 {plugin_dir_path} 加载插件失败: {e}")
import traceback
logger.error(traceback.format_exc())
if total_plugins_found == 0:
logger.info("未找到任何插件目录或插件")
# 输出加载统计
self._log_loading_stats()
return self.loaded_actions, self.loaded_commands
def _load_plugins_from_path(self, plugin_import_path: str, plugin_dir_path: str) -> int:
"""从指定路径加载插件
Args:
plugin_import_path: 插件的导入路径 (如 "plugins""src.plugins")
plugin_dir_path: 插件目录的文件系统路径
Returns:
int: 找到的插件包数量
"""
# 检查插件目录是否存在
if not os.path.exists(plugin_dir_path):
logger.debug(f"插件目录 {plugin_dir_path} 不存在,跳过")
return 0
logger.info(f"正在从 {plugin_dir_path} 加载插件...")
# 导入插件包
try:
plugins_package = importlib.import_module(plugin_import_path)
logger.info(f"成功导入插件包: {plugin_import_path}")
except ImportError as e:
logger.warning(f"导入插件包 {plugin_import_path} 失败: {e}")
return 0
# 遍历插件包中的所有子包
plugins_found = 0
for _, plugin_name, is_pkg in pkgutil.iter_modules(
plugins_package.__path__, plugins_package.__name__ + "."
):
if not is_pkg:
continue
logger.debug(f"检测到插件: {plugin_name}")
# 记录插件来源
self.plugin_sources[plugin_name] = plugin_dir_path
self._load_single_plugin(plugin_name)
plugins_found += 1
if plugins_found > 0:
logger.info(f"{plugin_dir_path} 找到 {plugins_found} 个插件包")
else:
logger.debug(f"{plugin_dir_path} 未找到任何插件包")
return plugins_found
def _load_single_plugin(self, plugin_name: str) -> None:
"""加载单个插件的所有组件
Args:
plugin_name: 插件名称
"""
plugin_stats = {"actions": 0, "commands": 0}
# 加载动作组件
actions_count = self._load_plugin_actions(plugin_name)
plugin_stats["actions"] = actions_count
self.loaded_actions += actions_count
# 加载命令组件
commands_count = self._load_plugin_commands(plugin_name)
plugin_stats["commands"] = commands_count
self.loaded_commands += commands_count
# 记录插件统计信息
if actions_count > 0 or commands_count > 0:
self.plugin_stats[plugin_name] = plugin_stats
logger.info(f"插件 {plugin_name} 加载完成: {actions_count} 个动作, {commands_count} 个命令")
def _load_plugin_actions(self, plugin_name: str) -> int:
"""加载插件的动作组件
Args:
plugin_name: 插件名称
Returns:
int: 加载的动作数量
"""
loaded_count = 0
# 优先检查插件是否有actions子包
plugin_actions_path = f"{plugin_name}.actions"
plugin_actions_dir = plugin_name.replace(".", os.path.sep) + os.path.sep + "actions"
actions_loaded_from_subdir = False
# 首先尝试从actions子目录加载
if os.path.exists(plugin_actions_dir):
loaded_count += self._load_from_actions_subdir(plugin_name, plugin_actions_path, plugin_actions_dir)
if loaded_count > 0:
actions_loaded_from_subdir = True
# 如果actions子目录不存在或加载失败尝试从插件根目录加载
if not actions_loaded_from_subdir:
loaded_count += self._load_actions_from_root_dir(plugin_name)
return loaded_count
def _load_plugin_commands(self, plugin_name: str) -> int:
"""加载插件的命令组件
Args:
plugin_name: 插件名称
Returns:
int: 加载的命令数量
"""
loaded_count = 0
# 优先检查插件是否有commands子包
plugin_commands_path = f"{plugin_name}.commands"
plugin_commands_dir = plugin_name.replace(".", os.path.sep) + os.path.sep + "commands"
commands_loaded_from_subdir = False
# 首先尝试从commands子目录加载
if os.path.exists(plugin_commands_dir):
loaded_count += self._load_from_commands_subdir(plugin_name, plugin_commands_path, plugin_commands_dir)
if loaded_count > 0:
commands_loaded_from_subdir = True
# 如果commands子目录不存在或加载失败尝试从插件根目录加载
if not commands_loaded_from_subdir:
loaded_count += self._load_commands_from_root_dir(plugin_name)
return loaded_count
def _load_from_actions_subdir(self, plugin_name: str, plugin_actions_path: str, plugin_actions_dir: str) -> int:
"""从actions子目录加载动作"""
loaded_count = 0
try:
# 尝试导入插件的actions包
actions_module = importlib.import_module(plugin_actions_path)
logger.debug(f"成功加载插件动作模块: {plugin_actions_path}")
# 遍历actions目录中的所有Python文件
actions_dir = os.path.dirname(actions_module.__file__)
for file in os.listdir(actions_dir):
if file.endswith('.py') and file != '__init__.py':
action_module_name = f"{plugin_actions_path}.{file[:-3]}"
try:
importlib.import_module(action_module_name)
logger.info(f"成功加载动作: {action_module_name}")
loaded_count += 1
except Exception as e:
logger.error(f"加载动作失败: {action_module_name}, 错误: {e}")
except ImportError as e:
logger.debug(f"插件 {plugin_name} 的actions子包导入失败: {e}")
return loaded_count
def _load_from_commands_subdir(self, plugin_name: str, plugin_commands_path: str, plugin_commands_dir: str) -> int:
"""从commands子目录加载命令"""
loaded_count = 0
try:
# 尝试导入插件的commands包
commands_module = importlib.import_module(plugin_commands_path)
logger.debug(f"成功加载插件命令模块: {plugin_commands_path}")
# 遍历commands目录中的所有Python文件
commands_dir = os.path.dirname(commands_module.__file__)
for file in os.listdir(commands_dir):
if file.endswith('.py') and file != '__init__.py':
command_module_name = f"{plugin_commands_path}.{file[:-3]}"
try:
importlib.import_module(command_module_name)
logger.info(f"成功加载命令: {command_module_name}")
loaded_count += 1
except Exception as e:
logger.error(f"加载命令失败: {command_module_name}, 错误: {e}")
except ImportError as e:
logger.debug(f"插件 {plugin_name} 的commands子包导入失败: {e}")
return loaded_count
def _load_actions_from_root_dir(self, plugin_name: str) -> int:
"""从插件根目录加载动作文件"""
loaded_count = 0
try:
# 导入插件包本身
plugin_module = importlib.import_module(plugin_name)
logger.debug(f"尝试从插件根目录加载动作: {plugin_name}")
# 遍历插件根目录中的所有Python文件
plugin_dir = os.path.dirname(plugin_module.__file__)
for file in os.listdir(plugin_dir):
if file.endswith('.py') and file != '__init__.py':
# 跳过非动作文件(根据命名约定)
if not (file.endswith('_action.py') or file.endswith('_actions.py') or 'action' in file):
continue
action_module_name = f"{plugin_name}.{file[:-3]}"
try:
importlib.import_module(action_module_name)
logger.info(f"成功加载动作: {action_module_name}")
loaded_count += 1
except Exception as e:
logger.error(f"加载动作失败: {action_module_name}, 错误: {e}")
except ImportError as e:
logger.debug(f"插件 {plugin_name} 导入失败: {e}")
return loaded_count
def _load_commands_from_root_dir(self, plugin_name: str) -> int:
"""从插件根目录加载命令文件"""
loaded_count = 0
try:
# 导入插件包本身
plugin_module = importlib.import_module(plugin_name)
logger.debug(f"尝试从插件根目录加载命令: {plugin_name}")
# 遍历插件根目录中的所有Python文件
plugin_dir = os.path.dirname(plugin_module.__file__)
for file in os.listdir(plugin_dir):
if file.endswith('.py') and file != '__init__.py':
# 跳过非命令文件(根据命名约定)
if not (file.endswith('_command.py') or file.endswith('_commands.py') or 'command' in file):
continue
command_module_name = f"{plugin_name}.{file[:-3]}"
try:
importlib.import_module(command_module_name)
logger.info(f"成功加载命令: {command_module_name}")
loaded_count += 1
except Exception as e:
logger.error(f"加载命令失败: {command_module_name}, 错误: {e}")
except ImportError as e:
logger.debug(f"插件 {plugin_name} 导入失败: {e}")
return loaded_count
def _log_loading_stats(self) -> None:
"""输出加载统计信息"""
logger.success(f"插件加载完成: 总计 {self.loaded_actions} 个动作, {self.loaded_commands} 个命令")
if self.plugin_stats:
logger.info("插件加载详情:")
for plugin_name, stats in self.plugin_stats.items():
plugin_display_name = plugin_name.split('.')[-1] # 只显示插件名称,不显示完整路径
source_path = self.plugin_sources.get(plugin_name, "未知路径")
logger.info(f" {plugin_display_name} (来源: {source_path}): {stats['actions']} 动作, {stats['commands']} 命令")
# 创建全局插件加载器实例
plugin_loader = PluginLoader()