doc:完善doc

This commit is contained in:
SengokuCola
2025-06-19 23:21:31 +08:00
parent 98735b067c
commit 43425b3c1f
16 changed files with 2824 additions and 521 deletions

View File

@@ -2,157 +2,42 @@
> 欢迎来到MaiBot插件系统开发文档这里是你开始插件开发旅程的最佳起点。
## 🎯 快速导航
### 🌟 新手入门
## 新手入门
- [📖 快速开始指南](quick-start.md) - 5分钟创建你的第一个插件
## 组件功能详解
- [🧱 Action组件详解](action-components.md) - 掌握最核心的Action组件
- [💻 Command组件详解](command-components.md) - 学习直接响应命令的组件
- [⚙️ 配置管理指南](configuration-guide.md) - 学会使用配置驱动开发
### 📖 API参考
- [🔧 工具系统详解](tool-system.md) - 工具系统的使用和开发(非主要功能)
- [📡 消息API](api/message-api.md) - 消息发送接口
## API浏览
### 🔧 高级主题
### 🔗 核心通信API
- [📡 消息API](api/message-api.md) - 消息接收和处理接口
- [📤 发送API](api/send-api.md) - 各种类型消息发送接口
- [💬 聊天API](api/chat-api.md) - 聊天流管理和查询接口
- [📦 依赖管理系统](dependency-management.md) - Python包依赖管理
- [🔧 工具系统详解](tool-system.md) - 工具系统的使用和开发
### 🤖 AI与生成API
- [🧠 LLM API](api/llm-api.md) - 大语言模型交互接口
- [✨ 回复生成器API](api/generator-api.md) - 智能回复生成接口
- [😊 表情包API](api/emoji-api.md) - 表情包选择和管理接口
## 🔥 最新更新 (v2.0 新API格式)
### 📊 数据与配置API
- [🗄️ 数据库API](api/database-api.md) - 数据库操作接口
- [⚙️ 配置API](api/config-api.md) - 配置读取和用户信息接口
- [👤 个人信息API](api/person-api.md) - 用户信息查询接口
### 🎉 重大变更
### 🛠️ 工具API
- [🔧 工具API](api/utils-api.md) - 文件操作、时间处理等工具函数
1. **新API格式**
- 不再使用 `self.api`,改为直接方法调用
- `await self.send_text()` 替代旧的发送方式
- `await self.send_emoji()` 专门的表情发送方法
- `self.get_config()` 简化的配置访问
2. **replyer_1集成**
- 新增专用的 `generator_api` 模块
- 在Action中直接使用 `replyer_1` 生成个性化内容
- 支持多种生成风格和情感色彩
3. **更好的类型安全**
- 完整的类型注解支持
- 更清晰的返回值类型
- 更好的IDE支持
## 🚀 最佳学习路径
### 📚 初学者路径(推荐)
1. **基础入门**
```
快速开始指南 → Action组件详解 → Command组件详解
```
2. **API掌握**
```
消息API指南 → 配置管理指南
```
3. **高级功能**
```
依赖管理系统 → 工具系统详解
```
## 💡 核心概念速览
### 🧱 Action组件
- **用途**:增强麦麦的主动行为,让对话更自然
- **激活**关键词、LLM判断、随机等多种方式
- **新特性**支持replyer_1智能生成、更简洁的API
### 💻 Command组件
- **用途**:响应用户的明确指令,提供确定性功能
- **触发**:正则表达式匹配用户输入
- **特点**:即时响应、参数解析、拦截控制
### ⚙️ 配置系统
- **Schema驱动**使用ConfigField定义配置结构
- **类型安全**:强类型配置验证
- **嵌套访问**:支持 `section.key` 形式访问
### 🧠 replyer_1集成
- **智能生成**AI驱动的个性化内容生成
- **简单易用**:通过 `generator_api` 轻松调用
- **灵活配置**:支持多种生成风格和参数
## 📋 开发清单
在开始开发之前,确保你已经:
- [ ] 阅读了[快速开始指南](quick-start.md)
- [ ] 了解了Action组件或Command组件
- [ ] 熟悉了[Action组件](action-components.md)或[Command组件](command-components.md)
- [ ] 查看了[配置管理](configuration-guide.md)
开发完成后,请检查:
- [ ] 使用了新的API格式`self.send_text()`等)
- [ ] 正确配置了Schema和ConfigField
- [ ] 添加了适当的错误处理
- [ ] 测试了所有功能路径
## 🤝 获取帮助
### 📖 文档问题
如果你在文档中发现错误或需要补充,请:
> 如果你在文档中发现错误或需要补充,请
1. 检查最新的文档版本
2. 查看相关示例代码
3. 参考其他类似插件
### 💻 开发问题
遇到开发问题时:
1. 查看现有插件示例
2. 检查配置是否正确
3. 参考API文档
### 🎯 最佳实践建议
为了创建高质量的插件:
1. 始终使用新的API格式
2. 充分利用replyer_1的智能生成能力
3. 设计配置驱动的功能
4. 实现完善的错误处理
5. 编写清晰的文档注释
---
## 🌟 推荐插件示例
### 🎯 新手友好
- **Hello World插件**展示基础API使用
- **简单计算器**Command组件入门
- **智能问候**Action组件和replyer_1集成
### 🔧 实用工具
- **智能聊天助手**完整的replyer_1集成示例
- **用户管理系统**:配置驱动的复杂功能
- **定时提醒插件**:状态管理和持久化
### 🚀 高级应用
- **多功能聊天助手**:综合功能展示
- **游戏管理插件**:复杂状态管理
- **数据分析插件**:外部服务集成
---
**🎉 准备好开始了吗?从[快速开始指南](quick-start.md)开始你的插件开发之旅!**
使用新的API格式你可以创建更强大、更智能、更易维护的插件。让我们一起构建更好的MaiBot生态系统
4. 提交文档仓库issue

View File

@@ -373,197 +373,3 @@ class SurpriseAction(BaseAction):
await self.send_emoji(selected)
return True, f"发送了惊喜表情: {selected}"
```
## 💡 完整示例
### 智能聊天Action
```python
from src.plugin_system.apis import generator_api, emoji_api
class IntelligentChatAction(BaseAction):
"""智能聊天Action - 展示新API的完整用法"""
# 激活设置
focus_activation_type = ActionActivationType.ALWAYS
normal_activation_type = ActionActivationType.LLM_JUDGE
mode_enable = ChatMode.ALL
parallel_action = False
# 基本信息
action_name = "intelligent_chat"
action_description = "使用replyer_1进行智能聊天回复支持表情包和个性化回复"
# LLM判断提示词
llm_judge_prompt = """
判断是否需要进行智能聊天回复:
1. 用户提出了有趣的话题
2. 需要更加个性化的回复
3. 适合发送表情包的情况
请回答"是"或"否"。
"""
# 功能定义
action_parameters = {
"topic": "聊天话题",
"mood": "当前氛围happy/sad/excited/calm",
"include_emoji": "是否包含表情包true/false"
}
action_require = [
"需要更个性化回复时使用",
"聊天氛围适合发送表情时使用",
"避免在正式场合使用"
]
associated_types = ["text", "emoji"]
async def execute(self) -> Tuple[bool, str]:
# 获取参数
topic = self.action_data.get("topic", "日常聊天")
mood = self.action_data.get("mood", "happy")
include_emoji = self.action_data.get("include_emoji", "true") == "true"
# 构建智能回复数据
chat_data = {
"text": f"请针对{topic}话题进行回复,当前氛围是{mood}",
"topic": topic,
"mood": mood,
"style": "conversational",
"replyer_name": "replyer_1" # 使用replyer_1
}
# 生成智能回复
success, reply_set = await generator_api.generate_reply(
chat_stream=self.chat_stream,
action_data=chat_data,
platform=self.platform,
chat_id=self.chat_id,
is_group=self.is_group
)
reply_sent = False
if success and reply_set:
# 发送生成的回复
for reply_type, content in reply_set:
if reply_type == "text":
await self.send_text(content)
reply_sent = True
elif reply_type == "emoji":
await self.send_emoji(content)
# 如果配置允许且生成失败,发送表情包
if include_emoji and not reply_sent:
emoji_result = await emoji_api.get_by_description(mood)
if emoji_result:
emoji_base64, emoji_desc, matched_emotion = emoji_result
await self.send_emoji(emoji_base64)
reply_sent = True
# 记录动作执行
if reply_sent:
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"进行了智能聊天回复,话题:{topic},氛围:{mood}",
action_done=True
)
return True, f"完成智能聊天回复:{topic}"
else:
return False, "智能回复生成失败"
```
## 🛠️ 调试技巧
### 开发调试Action
```python
class DebugAction(BaseAction):
"""调试Action - 展示如何调试新API"""
focus_activation_type = ActionActivationType.KEYWORD
normal_activation_type = ActionActivationType.KEYWORD
activation_keywords = ["debug", "调试"]
mode_enable = ChatMode.ALL
parallel_action = True
action_name = "debug_helper"
action_description = "调试助手,显示当前状态信息"
action_parameters = {}
action_require = ["需要调试信息时使用"]
associated_types = ["text"]
async def execute(self) -> Tuple[bool, str]:
# 收集调试信息
debug_info = {
"聊天类型": "群聊" if self.is_group else "私聊",
"平台": self.platform,
"目标ID": self.target_id,
"用户ID": self.user_id,
"用户昵称": self.user_nickname,
"动作数据": self.action_data,
}
if self.is_group:
debug_info.update({
"群ID": self.group_id,
"群名": self.group_name,
})
# 格式化调试信息
info_lines = ["🔍 调试信息:"]
for key, value in debug_info.items():
info_lines.append(f" • {key}: {value}")
debug_text = "\n".join(info_lines)
# 发送调试信息
await self.send_text(debug_text)
# 测试配置获取
test_config = self.get_config("debug.verbose", True)
if test_config:
await self.send_text(f"配置测试: debug.verbose = {test_config}")
return True, "调试信息已发送"
```
## 📚 最佳实践
1. **总是导入所需的API模块**
```python
from src.plugin_system.apis import generator_api, send_api, emoji_api
```
2. **在生成内容时指定replyer_1**
```python
action_data = {
"text": "生成内容的请求",
"replyer_name": "replyer_1"
}
```
3. **使用便捷发送方法**
```python
await self.send_text("文本") # 自动处理群聊/私聊
await self.send_emoji(emoji_base64)
```
4. **合理使用配置**
```python
enable_feature = self.get_config("section.key", default_value)
```
5. **总是记录动作信息**
```python
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display="动作描述",
action_done=True
)
```
通过使用新的API格式Action的开发变得更加简洁和强大

View File

@@ -0,0 +1,151 @@
# 聊天API
聊天API模块专门负责聊天信息的查询和管理帮助插件获取和管理不同的聊天流。
## 导入方式
```python
from src.plugin_system.apis import chat_api
# 或者
from src.plugin_system.apis.chat_api import ChatManager as chat
```
## 主要功能
### 1. 获取聊天流
#### `get_all_streams(platform: str = "qq") -> List[ChatStream]`
获取所有聊天流
**参数:**
- `platform`:平台筛选,默认为"qq"
**返回:**
- `List[ChatStream]`:聊天流列表
**示例:**
```python
streams = chat_api.get_all_streams()
for stream in streams:
print(f"聊天流ID: {stream.stream_id}")
```
#### `get_group_streams(platform: str = "qq") -> List[ChatStream]`
获取所有群聊聊天流
**参数:**
- `platform`:平台筛选,默认为"qq"
**返回:**
- `List[ChatStream]`:群聊聊天流列表
#### `get_private_streams(platform: str = "qq") -> List[ChatStream]`
获取所有私聊聊天流
**参数:**
- `platform`:平台筛选,默认为"qq"
**返回:**
- `List[ChatStream]`:私聊聊天流列表
### 2. 查找特定聊天流
#### `get_stream_by_group_id(group_id: str, platform: str = "qq") -> Optional[ChatStream]`
根据群ID获取聊天流
**参数:**
- `group_id`群聊ID
- `platform`:平台,默认为"qq"
**返回:**
- `Optional[ChatStream]`聊天流对象如果未找到返回None
**示例:**
```python
chat_stream = chat_api.get_stream_by_group_id("123456789")
if chat_stream:
print(f"找到群聊: {chat_stream.group_info.group_name}")
```
#### `get_stream_by_user_id(user_id: str, platform: str = "qq") -> Optional[ChatStream]`
根据用户ID获取私聊流
**参数:**
- `user_id`用户ID
- `platform`:平台,默认为"qq"
**返回:**
- `Optional[ChatStream]`聊天流对象如果未找到返回None
### 3. 聊天流信息查询
#### `get_stream_type(chat_stream: ChatStream) -> str`
获取聊天流类型
**参数:**
- `chat_stream`:聊天流对象
**返回:**
- `str`:聊天类型 ("group", "private", "unknown")
#### `get_stream_info(chat_stream: ChatStream) -> Dict[str, Any]`
获取聊天流详细信息
**参数:**
- `chat_stream`:聊天流对象
**返回:**
- `Dict[str, Any]`聊天流信息字典包含stream_id、platform、type等信息
**示例:**
```python
info = chat_api.get_stream_info(chat_stream)
print(f"聊天类型: {info['type']}")
print(f"平台: {info['platform']}")
if info['type'] == 'group':
print(f"群ID: {info['group_id']}")
print(f"群名: {info['group_name']}")
```
#### `get_streams_summary() -> Dict[str, int]`
获取聊天流统计信息
**返回:**
- `Dict[str, int]`:包含各平台群聊和私聊数量的统计字典
## 使用示例
### 基础用法
```python
from src.plugin_system.apis import chat_api
# 获取所有群聊
group_streams = chat_api.get_group_streams()
print(f"共有 {len(group_streams)} 个群聊")
# 查找特定群聊
target_group = chat_api.get_stream_by_group_id("123456789")
if target_group:
group_info = chat_api.get_stream_info(target_group)
print(f"群名: {group_info['group_name']}")
```
### 遍历所有聊天流
```python
# 获取所有聊天流并分类处理
all_streams = chat_api.get_all_streams()
for stream in all_streams:
stream_type = chat_api.get_stream_type(stream)
if stream_type == "group":
print(f"群聊: {stream.group_info.group_name}")
elif stream_type == "private":
print(f"私聊: {stream.user_info.user_nickname}")
```
## 注意事项
1. 所有函数都有错误处理,失败时会记录日志
2. 查询函数返回None或空列表时表示未找到结果
3. `platform`参数通常为"qq",也可能支持其他平台
4. `ChatStream`对象包含了聊天的完整信息,包括用户信息、群信息等

View File

@@ -0,0 +1,183 @@
# 配置API
配置API模块提供了配置读取和用户信息获取等功能让插件能够安全地访问全局配置和用户信息。
## 导入方式
```python
from src.plugin_system.apis import config_api
```
## 主要功能
### 1. 配置访问
#### `get_global_config(key: str, default: Any = None) -> Any`
安全地从全局配置中获取一个值
**参数:**
- `key`:配置键名,支持嵌套访问如 "section.subsection.key"
- `default`:如果配置不存在时返回的默认值
**返回:**
- `Any`:配置值或默认值
**示例:**
```python
# 获取机器人昵称
bot_name = config_api.get_global_config("bot.nickname", "MaiBot")
# 获取嵌套配置
llm_model = config_api.get_global_config("model.default.model_name", "gpt-3.5-turbo")
# 获取不存在的配置
unknown_config = config_api.get_global_config("unknown.config", "默认值")
```
#### `get_plugin_config(plugin_config: dict, key: str, default: Any = None) -> Any`
从插件配置中获取值,支持嵌套键访问
**参数:**
- `plugin_config`:插件配置字典
- `key`:配置键名,支持嵌套访问如 "section.subsection.key"
- `default`:如果配置不存在时返回的默认值
**返回:**
- `Any`:配置值或默认值
**示例:**
```python
# 在插件中使用
class MyPlugin(BasePlugin):
async def handle_action(self, action_data, chat_stream):
# 获取插件配置
api_key = config_api.get_plugin_config(self.config, "api.key", "")
timeout = config_api.get_plugin_config(self.config, "timeout", 30)
if not api_key:
logger.warning("API密钥未配置")
return False
```
### 2. 用户信息API
#### `get_user_id_by_person_name(person_name: str) -> tuple[str, str]`
根据用户名获取用户ID
**参数:**
- `person_name`:用户名
**返回:**
- `tuple[str, str]`(平台, 用户ID)
**示例:**
```python
platform, user_id = await config_api.get_user_id_by_person_name("张三")
if platform and user_id:
print(f"用户张三在{platform}平台的ID是{user_id}")
```
#### `get_person_info(person_id: str, key: str, default: Any = None) -> Any`
获取用户信息
**参数:**
- `person_id`用户ID
- `key`:信息键名
- `default`:默认值
**返回:**
- `Any`:用户信息值或默认值
**示例:**
```python
# 获取用户昵称
nickname = await config_api.get_person_info(person_id, "nickname", "未知用户")
# 获取用户印象
impression = await config_api.get_person_info(person_id, "impression", "")
```
## 使用示例
### 配置驱动的插件开发
```python
from src.plugin_system.apis import config_api
from src.plugin_system.base import BasePlugin
class WeatherPlugin(BasePlugin):
async def handle_action(self, action_data, chat_stream):
# 从全局配置获取API配置
api_endpoint = config_api.get_global_config("weather.api_endpoint", "")
default_city = config_api.get_global_config("weather.default_city", "北京")
# 从插件配置获取特定设置
api_key = config_api.get_plugin_config(self.config, "api_key", "")
timeout = config_api.get_plugin_config(self.config, "timeout", 10)
if not api_key:
return {"success": False, "message": "Weather API密钥未配置"}
# 使用配置进行天气查询...
return {"success": True, "message": f"{default_city}今天天气晴朗"}
```
### 用户信息查询
```python
async def get_user_by_name(user_name: str):
"""根据用户名获取完整的用户信息"""
# 获取用户的平台和ID
platform, user_id = await config_api.get_user_id_by_person_name(user_name)
if not platform or not user_id:
return None
# 构建person_id
from src.person_info.person_info import PersonInfoManager
person_id = PersonInfoManager.get_person_id(platform, user_id)
# 获取用户详细信息
nickname = await config_api.get_person_info(person_id, "nickname", user_name)
impression = await config_api.get_person_info(person_id, "impression", "")
return {
"platform": platform,
"user_id": user_id,
"nickname": nickname,
"impression": impression
}
```
## 配置键名说明
### 常用全局配置键
- `bot.nickname`:机器人昵称
- `bot.qq_account`机器人QQ号
- `model.default`默认LLM模型配置
- `database.path`:数据库路径
### 嵌套配置访问
配置支持点号分隔的嵌套访问:
```python
# config.toml 中的配置:
# [bot]
# nickname = "MaiBot"
# qq_account = "123456"
#
# [model.default]
# model_name = "gpt-3.5-turbo"
# temperature = 0.7
# API调用
bot_name = config_api.get_global_config("bot.nickname")
model_name = config_api.get_global_config("model.default.model_name")
temperature = config_api.get_global_config("model.default.temperature")
```
## 注意事项
1. **只读访问**配置API只提供读取功能插件不能修改全局配置
2. **异步函数**:用户信息相关的函数是异步的,需要使用`await`
3. **错误处理**:所有函数都有错误处理,失败时会记录日志并返回默认值
4. **安全性**插件通过此API访问配置是安全和隔离的
5. **性能**:频繁访问的配置建议在插件初始化时获取并缓存

View File

@@ -0,0 +1,258 @@
# 数据库API
数据库API模块提供通用的数据库操作功能支持查询、创建、更新和删除记录采用Peewee ORM模型。
## 导入方式
```python
from src.plugin_system.apis import database_api
```
## 主要功能
### 1. 通用数据库查询
#### `db_query(model_class, query_type="get", filters=None, data=None, limit=None, order_by=None, single_result=False)`
执行数据库查询操作的通用接口
**参数:**
- `model_class`Peewee模型类如ActionRecords、Messages等
- `query_type`:查询类型,可选值: "get", "create", "update", "delete", "count"
- `filters`:过滤条件字典,键为字段名,值为要匹配的值
- `data`:用于创建或更新的数据字典
- `limit`:限制结果数量
- `order_by`:排序字段列表,使用字段名,前缀'-'表示降序
- `single_result`:是否只返回单个结果
**返回:**
根据查询类型返回不同的结果:
- "get":返回查询结果列表或单个结果
- "create":返回创建的记录
- "update":返回受影响的行数
- "delete":返回受影响的行数
- "count":返回记录数量
### 2. 便捷查询函数
#### `db_save(model_class, data, key_field=None, key_value=None)`
保存数据到数据库(创建或更新)
**参数:**
- `model_class`Peewee模型类
- `data`:要保存的数据字典
- `key_field`:用于查找现有记录的字段名
- `key_value`:用于查找现有记录的字段值
**返回:**
- `Dict[str, Any]`保存后的记录数据失败时返回None
#### `db_get(model_class, filters=None, order_by=None, limit=None)`
简化的查询函数
**参数:**
- `model_class`Peewee模型类
- `filters`:过滤条件字典
- `order_by`:排序字段
- `limit`:限制结果数量
**返回:**
- `Union[List[Dict], Dict, None]`:查询结果
### 3. 专用函数
#### `store_action_info(...)`
存储动作信息的专用函数
## 使用示例
### 1. 基本查询操作
```python
from src.plugin_system.apis import database_api
from src.common.database.database_model import Messages, ActionRecords
# 查询最近10条消息
messages = await database_api.db_query(
Messages,
query_type="get",
filters={"chat_id": chat_stream.stream_id},
limit=10,
order_by=["-time"]
)
# 查询单条记录
message = await database_api.db_query(
Messages,
query_type="get",
filters={"message_id": "msg_123"},
single_result=True
)
```
### 2. 创建记录
```python
# 创建新的动作记录
new_record = await database_api.db_query(
ActionRecords,
query_type="create",
data={
"action_id": "action_123",
"time": time.time(),
"action_name": "TestAction",
"action_done": True
}
)
print(f"创建了记录: {new_record['id']}")
```
### 3. 更新记录
```python
# 更新动作状态
updated_count = await database_api.db_query(
ActionRecords,
query_type="update",
filters={"action_id": "action_123"},
data={"action_done": True, "completion_time": time.time()}
)
print(f"更新了 {updated_count} 条记录")
```
### 4. 删除记录
```python
# 删除过期记录
deleted_count = await database_api.db_query(
ActionRecords,
query_type="delete",
filters={"time__lt": time.time() - 86400} # 删除24小时前的记录
)
print(f"删除了 {deleted_count} 条过期记录")
```
### 5. 统计查询
```python
# 统计消息数量
message_count = await database_api.db_query(
Messages,
query_type="count",
filters={"chat_id": chat_stream.stream_id}
)
print(f"该聊天有 {message_count} 条消息")
```
### 6. 使用便捷函数
```python
# 使用db_save进行创建或更新
record = await database_api.db_save(
ActionRecords,
{
"action_id": "action_123",
"time": time.time(),
"action_name": "TestAction",
"action_done": True
},
key_field="action_id",
key_value="action_123"
)
# 使用db_get进行简单查询
recent_messages = await database_api.db_get(
Messages,
filters={"chat_id": chat_stream.stream_id},
order_by="-time",
limit=5
)
```
## 高级用法
### 复杂查询示例
```python
# 查询特定用户在特定时间段的消息
user_messages = await database_api.db_query(
Messages,
query_type="get",
filters={
"user_id": "123456",
"time__gte": start_time, # 大于等于开始时间
"time__lt": end_time # 小于结束时间
},
order_by=["-time"],
limit=50
)
# 批量处理
for message in user_messages:
print(f"消息内容: {message['plain_text']}")
print(f"发送时间: {message['time']}")
```
### 插件中的数据持久化
```python
from src.plugin_system.base import BasePlugin
from src.plugin_system.apis import database_api
class DataPlugin(BasePlugin):
async def handle_action(self, action_data, chat_stream):
# 保存插件数据
plugin_data = {
"plugin_name": self.plugin_name,
"chat_id": chat_stream.stream_id,
"data": json.dumps(action_data),
"created_time": time.time()
}
# 使用自定义表模型(需要先定义)
record = await database_api.db_save(
PluginData, # 假设的插件数据模型
plugin_data,
key_field="plugin_name",
key_value=self.plugin_name
)
return {"success": True, "record_id": record["id"]}
```
## 数据模型
### 常用模型类
系统提供了以下常用的数据模型:
- `Messages`:消息记录
- `ActionRecords`:动作记录
- `UserInfo`:用户信息
- `GroupInfo`:群组信息
### 字段说明
#### Messages模型主要字段
- `message_id`消息ID
- `chat_id`聊天ID
- `user_id`用户ID
- `plain_text`:纯文本内容
- `time`:时间戳
#### ActionRecords模型主要字段
- `action_id`动作ID
- `action_name`:动作名称
- `action_done`:是否完成
- `time`:创建时间
## 注意事项
1. **异步操作**所有数据库API都是异步的必须使用`await`
2. **错误处理**函数内置错误处理失败时返回None或空列表
3. **数据类型**:返回的都是字典格式的数据,不是模型对象
4. **性能考虑**:使用`limit`参数避免查询大量数据
5. **过滤条件**支持简单的等值过滤复杂查询需要使用原生Peewee语法
6. **事务**如需事务支持建议直接使用Peewee的事务功能

View File

@@ -0,0 +1,253 @@
# 表情包API
表情包API模块提供表情包的获取、查询和管理功能让插件能够智能地选择和使用表情包。
## 导入方式
```python
from src.plugin_system.apis import emoji_api
```
## 主要功能
### 1. 表情包获取
#### `get_by_description(description: str) -> Optional[Tuple[str, str, str]]`
根据场景描述选择表情包
**参数:**
- `description`:场景描述文本,例如"开心的大笑"、"轻微的讽刺"、"表示无奈和沮丧"等
**返回:**
- `Optional[Tuple[str, str, str]]`(base64编码, 表情包描述, 匹配的场景) 或 None
**示例:**
```python
emoji_result = await emoji_api.get_by_description("开心的大笑")
if emoji_result:
emoji_base64, description, matched_scene = emoji_result
print(f"获取到表情包: {description}, 场景: {matched_scene}")
# 可以将emoji_base64用于发送表情包
```
#### `get_random() -> Optional[Tuple[str, str, str]]`
随机获取表情包
**返回:**
- `Optional[Tuple[str, str, str]]`(base64编码, 表情包描述, 随机场景) 或 None
**示例:**
```python
random_emoji = await emoji_api.get_random()
if random_emoji:
emoji_base64, description, scene = random_emoji
print(f"随机表情包: {description}")
```
#### `get_by_emotion(emotion: str) -> Optional[Tuple[str, str, str]]`
根据场景关键词获取表情包
**参数:**
- `emotion`:场景关键词,如"大笑"、"讽刺"、"无奈"等
**返回:**
- `Optional[Tuple[str, str, str]]`(base64编码, 表情包描述, 匹配的场景) 或 None
**示例:**
```python
emoji_result = await emoji_api.get_by_emotion("讽刺")
if emoji_result:
emoji_base64, description, scene = emoji_result
# 发送讽刺表情包
```
### 2. 表情包信息查询
#### `get_count() -> int`
获取表情包数量
**返回:**
- `int`:当前可用的表情包数量
#### `get_info() -> dict`
获取表情包系统信息
**返回:**
- `dict`:包含表情包数量、最大数量等信息
**返回字典包含:**
- `current_count`:当前表情包数量
- `max_count`:最大表情包数量
- `available_emojis`:可用表情包数量
#### `get_emotions() -> list`
获取所有可用的场景关键词
**返回:**
- `list`:所有表情包的场景关键词列表(去重)
#### `get_descriptions() -> list`
获取所有表情包的描述列表
**返回:**
- `list`:所有表情包的描述文本列表
## 使用示例
### 1. 智能表情包选择
```python
from src.plugin_system.apis import emoji_api
async def send_emotion_response(message_text: str, chat_stream):
"""根据消息内容智能选择表情包回复"""
# 分析消息场景
if "哈哈" in message_text or "好笑" in message_text:
emoji_result = await emoji_api.get_by_description("开心的大笑")
elif "无语" in message_text or "算了" in message_text:
emoji_result = await emoji_api.get_by_description("表示无奈和沮丧")
elif "呵呵" in message_text or "是吗" in message_text:
emoji_result = await emoji_api.get_by_description("轻微的讽刺")
elif "生气" in message_text or "愤怒" in message_text:
emoji_result = await emoji_api.get_by_description("愤怒和不满")
else:
# 随机选择一个表情包
emoji_result = await emoji_api.get_random()
if emoji_result:
emoji_base64, description, scene = emoji_result
# 使用send_api发送表情包
from src.plugin_system.apis import send_api
success = await send_api.emoji_to_group(emoji_base64, chat_stream.group_info.group_id)
return success
return False
```
### 2. 表情包管理功能
```python
async def show_emoji_stats():
"""显示表情包统计信息"""
# 获取基本信息
count = emoji_api.get_count()
info = emoji_api.get_info()
scenes = emoji_api.get_emotions() # 实际返回的是场景关键词
stats = f"""
📊 表情包统计信息:
- 总数量: {count}
- 可用数量: {info['available_emojis']}
- 最大容量: {info['max_count']}
- 支持场景: {len(scenes)}
🎭 支持的场景关键词: {', '.join(scenes[:10])}{'...' if len(scenes) > 10 else ''}
"""
return stats
```
### 3. 表情包测试功能
```python
async def test_emoji_system():
"""测试表情包系统的各种功能"""
print("=== 表情包系统测试 ===")
# 测试场景描述查找
test_descriptions = ["开心的大笑", "轻微的讽刺", "表示无奈和沮丧", "愤怒和不满"]
for desc in test_descriptions:
result = await emoji_api.get_by_description(desc)
if result:
_, description, scene = result
print(f"✅ 场景'{desc}' -> {description} ({scene})")
else:
print(f"❌ 场景'{desc}' -> 未找到")
# 测试关键词查找
scenes = emoji_api.get_emotions()
if scenes:
test_scene = scenes[0]
result = await emoji_api.get_by_emotion(test_scene)
if result:
print(f"✅ 关键词'{test_scene}' -> 找到匹配表情包")
# 测试随机获取
random_result = await emoji_api.get_random()
if random_result:
print("✅ 随机获取 -> 成功")
print(f"📊 系统信息: {emoji_api.get_info()}")
```
### 4. 在Action中使用表情包
```python
from src.plugin_system.base import BaseAction
class EmojiAction(BaseAction):
async def execute(self, action_data, chat_stream):
# 从action_data获取场景描述或关键词
scene_keyword = action_data.get("scene", "")
scene_description = action_data.get("description", "")
emoji_result = None
# 优先使用具体的场景描述
if scene_description:
emoji_result = await emoji_api.get_by_description(scene_description)
# 其次使用场景关键词
elif scene_keyword:
emoji_result = await emoji_api.get_by_emotion(scene_keyword)
# 最后随机选择
else:
emoji_result = await emoji_api.get_random()
if emoji_result:
emoji_base64, description, scene = emoji_result
return {
"success": True,
"emoji_base64": emoji_base64,
"description": description,
"scene": scene
}
return {"success": False, "message": "未找到合适的表情包"}
```
## 场景描述说明
### 常用场景描述
表情包系统支持多种具体的场景描述,常见的包括:
- **开心类场景**:开心的大笑、满意的微笑、兴奋的手舞足蹈
- **无奈类场景**:表示无奈和沮丧、轻微的讽刺、无语的摇头
- **愤怒类场景**:愤怒和不满、生气的瞪视、暴躁的抓狂
- **惊讶类场景**:震惊的表情、意外的发现、困惑的思考
- **可爱类场景**:卖萌的表情、撒娇的动作、害羞的样子
### 场景关键词示例
系统支持的场景关键词包括:
- 大笑、微笑、兴奋、手舞足蹈
- 无奈、沮丧、讽刺、无语、摇头
- 愤怒、不满、生气、瞪视、抓狂
- 震惊、意外、困惑、思考
- 卖萌、撒娇、害羞、可爱
### 匹配机制
- **精确匹配**:优先匹配完整的场景描述,如"开心的大笑"
- **关键词匹配**:如果没有精确匹配,则根据关键词进行模糊匹配
- **语义匹配**:系统会理解场景的语义含义进行智能匹配
## 注意事项
1. **异步函数**:获取表情包的函数都是异步的,需要使用 `await`
2. **返回格式**表情包以base64编码返回可直接用于发送
3. **错误处理**所有函数都有错误处理失败时返回None或默认值
4. **使用统计**:系统会记录表情包的使用次数
5. **文件依赖**:表情包依赖于本地文件,确保表情包文件存在
6. **编码格式**返回的是base64编码的图片数据可直接用于网络传输
7. **场景理解**:系统能理解具体的场景描述,比简单的情感分类更准确

View File

@@ -0,0 +1,341 @@
# 回复生成器API
回复生成器API模块提供智能回复生成功能让插件能够使用系统的回复生成器来产生自然的聊天回复。
## 导入方式
```python
from src.plugin_system.apis import generator_api
```
## 主要功能
### 1. 回复器获取
#### `get_replyer(chat_stream=None, platform=None, chat_id=None, is_group=True)`
获取回复器对象
**参数:**
- `chat_stream`:聊天流对象(优先)
- `platform`:平台名称,如"qq"
- `chat_id`聊天ID群ID或用户ID
- `is_group`:是否为群聊
**返回:**
- `DefaultReplyer`回复器对象如果获取失败则返回None
**示例:**
```python
# 使用聊天流获取回复器
replyer = generator_api.get_replyer(chat_stream=chat_stream)
# 使用平台和ID获取回复器
replyer = generator_api.get_replyer(
platform="qq",
chat_id="123456789",
is_group=True
)
```
### 2. 回复生成
#### `generate_reply(chat_stream=None, action_data=None, platform=None, chat_id=None, is_group=True)`
生成回复
**参数:**
- `chat_stream`:聊天流对象(优先)
- `action_data`:动作数据
- `platform`:平台名称(备用)
- `chat_id`聊天ID备用
- `is_group`:是否为群聊(备用)
**返回:**
- `Tuple[bool, List[Tuple[str, Any]]]`(是否成功, 回复集合)
**示例:**
```python
success, reply_set = await generator_api.generate_reply(
chat_stream=chat_stream,
action_data={"message": "你好", "intent": "greeting"}
)
if success:
for reply_type, reply_content in reply_set:
print(f"回复类型: {reply_type}, 内容: {reply_content}")
```
#### `rewrite_reply(chat_stream=None, reply_data=None, platform=None, chat_id=None, is_group=True)`
重写回复
**参数:**
- `chat_stream`:聊天流对象(优先)
- `reply_data`:回复数据
- `platform`:平台名称(备用)
- `chat_id`聊天ID备用
- `is_group`:是否为群聊(备用)
**返回:**
- `Tuple[bool, List[Tuple[str, Any]]]`(是否成功, 回复集合)
**示例:**
```python
success, reply_set = await generator_api.rewrite_reply(
chat_stream=chat_stream,
reply_data={"original_text": "原始回复", "style": "more_friendly"}
)
```
## 使用示例
### 1. 基础回复生成
```python
from src.plugin_system.apis import generator_api
async def generate_greeting_reply(chat_stream, user_name):
"""生成问候回复"""
action_data = {
"intent": "greeting",
"user_name": user_name,
"context": "morning_greeting"
}
success, reply_set = await generator_api.generate_reply(
chat_stream=chat_stream,
action_data=action_data
)
if success and reply_set:
# 获取第一个回复
reply_type, reply_content = reply_set[0]
return reply_content
return "你好!" # 默认回复
```
### 2. 在Action中使用回复生成器
```python
from src.plugin_system.base import BaseAction
class ChatAction(BaseAction):
async def execute(self, action_data, chat_stream):
# 准备回复数据
reply_context = {
"message_type": "response",
"user_input": action_data.get("user_message", ""),
"intent": action_data.get("intent", ""),
"entities": action_data.get("entities", {}),
"context": self.get_conversation_context(chat_stream)
}
# 生成回复
success, reply_set = await generator_api.generate_reply(
chat_stream=chat_stream,
action_data=reply_context
)
if success:
return {
"success": True,
"replies": reply_set,
"generated_count": len(reply_set)
}
return {
"success": False,
"error": "回复生成失败",
"fallback_reply": "抱歉,我现在无法理解您的消息。"
}
```
### 3. 多样化回复生成
```python
async def generate_diverse_replies(chat_stream, topic, count=3):
"""生成多个不同风格的回复"""
styles = ["formal", "casual", "humorous"]
all_replies = []
for i, style in enumerate(styles[:count]):
action_data = {
"topic": topic,
"style": style,
"variation": i
}
success, reply_set = await generator_api.generate_reply(
chat_stream=chat_stream,
action_data=action_data
)
if success and reply_set:
all_replies.extend(reply_set)
return all_replies
```
### 4. 回复重写功能
```python
async def improve_reply(chat_stream, original_reply, improvement_type="more_friendly"):
"""改进原始回复"""
reply_data = {
"original_text": original_reply,
"improvement_type": improvement_type,
"target_audience": "young_users",
"tone": "positive"
}
success, improved_replies = await generator_api.rewrite_reply(
chat_stream=chat_stream,
reply_data=reply_data
)
if success and improved_replies:
# 返回改进后的第一个回复
_, improved_content = improved_replies[0]
return improved_content
return original_reply # 如果改进失败,返回原始回复
```
### 5. 条件回复生成
```python
async def conditional_reply_generation(chat_stream, user_message, user_emotion):
"""根据用户情感生成条件回复"""
# 根据情感调整回复策略
if user_emotion == "sad":
action_data = {
"intent": "comfort",
"tone": "empathetic",
"style": "supportive"
}
elif user_emotion == "angry":
action_data = {
"intent": "calm",
"tone": "peaceful",
"style": "understanding"
}
else:
action_data = {
"intent": "respond",
"tone": "neutral",
"style": "helpful"
}
action_data["user_message"] = user_message
action_data["user_emotion"] = user_emotion
success, reply_set = await generator_api.generate_reply(
chat_stream=chat_stream,
action_data=action_data
)
return reply_set if success else []
```
## 回复集合格式
### 回复类型
生成的回复集合包含多种类型的回复:
- `"text"`:纯文本回复
- `"emoji"`:表情包回复
- `"image"`:图片回复
- `"mixed"`:混合类型回复
### 回复集合结构
```python
# 示例回复集合
reply_set = [
("text", "很高兴见到你!"),
("emoji", "emoji_base64_data"),
("text", "有什么可以帮助你的吗?")
]
```
## 高级用法
### 1. 自定义回复器配置
```python
async def generate_with_custom_config(chat_stream, action_data):
"""使用自定义配置生成回复"""
# 获取回复器
replyer = generator_api.get_replyer(chat_stream=chat_stream)
if replyer:
# 可以访问回复器的内部方法
success, reply_set = await replyer.generate_reply_with_context(
reply_data=action_data,
# 可以传递额外的配置参数
)
return success, reply_set
return False, []
```
### 2. 回复质量评估
```python
async def generate_and_evaluate_replies(chat_stream, action_data):
"""生成回复并评估质量"""
success, reply_set = await generator_api.generate_reply(
chat_stream=chat_stream,
action_data=action_data
)
if success:
evaluated_replies = []
for reply_type, reply_content in reply_set:
# 简单的质量评估
quality_score = evaluate_reply_quality(reply_content)
evaluated_replies.append({
"type": reply_type,
"content": reply_content,
"quality": quality_score
})
# 按质量排序
evaluated_replies.sort(key=lambda x: x["quality"], reverse=True)
return evaluated_replies
return []
def evaluate_reply_quality(reply_content):
"""简单的回复质量评估"""
if not reply_content:
return 0
score = 50 # 基础分
# 长度适中加分
if 5 <= len(reply_content) <= 100:
score += 20
# 包含积极词汇加分
positive_words = ["好", "棒", "不错", "感谢", "开心"]
for word in positive_words:
if word in reply_content:
score += 10
break
return min(score, 100)
```
## 注意事项
1. **异步操作**:所有生成函数都是异步的,必须使用`await`
2. **错误处理**函数内置错误处理失败时返回False和空列表
3. **聊天流依赖**:需要有效的聊天流对象才能正常工作
4. **性能考虑**回复生成可能需要一些时间特别是使用LLM时
5. **回复格式**:返回的回复集合是元组列表,包含类型和内容
6. **上下文感知**:生成器会考虑聊天上下文和历史消息

244
docs/plugins/api/llm-api.md Normal file
View File

@@ -0,0 +1,244 @@
# LLM API
LLM API模块提供与大语言模型交互的功能让插件能够使用系统配置的LLM模型进行内容生成。
## 导入方式
```python
from src.plugin_system.apis import llm_api
```
## 主要功能
### 1. 模型管理
#### `get_available_models() -> Dict[str, Any]`
获取所有可用的模型配置
**返回:**
- `Dict[str, Any]`模型配置字典key为模型名称value为模型配置
**示例:**
```python
models = llm_api.get_available_models()
for model_name, model_config in models.items():
print(f"模型: {model_name}")
print(f"配置: {model_config}")
```
### 2. 内容生成
#### `generate_with_model(prompt, model_config, request_type="plugin.generate", **kwargs)`
使用指定模型生成内容
**参数:**
- `prompt`:提示词
- `model_config`:模型配置(从 get_available_models 获取)
- `request_type`:请求类型标识
- `**kwargs`其他模型特定参数如temperature、max_tokens等
**返回:**
- `Tuple[bool, str, str, str]`(是否成功, 生成的内容, 推理过程, 模型名称)
**示例:**
```python
models = llm_api.get_available_models()
default_model = models.get("default")
if default_model:
success, response, reasoning, model_name = await llm_api.generate_with_model(
prompt="请写一首关于春天的诗",
model_config=default_model,
temperature=0.7,
max_tokens=200
)
if success:
print(f"生成内容: {response}")
print(f"使用模型: {model_name}")
```
## 使用示例
### 1. 基础文本生成
```python
from src.plugin_system.apis import llm_api
async def generate_story(topic: str):
"""生成故事"""
models = llm_api.get_available_models()
model = models.get("default")
if not model:
return "未找到可用模型"
prompt = f"请写一个关于{topic}的短故事大约100字左右。"
success, story, reasoning, model_name = await llm_api.generate_with_model(
prompt=prompt,
model_config=model,
request_type="story.generate",
temperature=0.8,
max_tokens=150
)
return story if success else "故事生成失败"
```
### 2. 在Action中使用LLM
```python
from src.plugin_system.base import BaseAction
class LLMAction(BaseAction):
async def execute(self, action_data, chat_stream):
# 获取用户输入
user_input = action_data.get("user_message", "")
intent = action_data.get("intent", "chat")
# 获取模型配置
models = llm_api.get_available_models()
model = models.get("default")
if not model:
return {"success": False, "error": "未配置LLM模型"}
# 构建提示词
prompt = self.build_prompt(user_input, intent)
# 生成回复
success, response, reasoning, model_name = await llm_api.generate_with_model(
prompt=prompt,
model_config=model,
request_type=f"plugin.{self.plugin_name}",
temperature=0.7
)
if success:
return {
"success": True,
"response": response,
"model_used": model_name,
"reasoning": reasoning
}
return {"success": False, "error": response}
def build_prompt(self, user_input: str, intent: str) -> str:
"""构建提示词"""
base_prompt = "你是一个友善的AI助手。"
if intent == "question":
return f"{base_prompt}\n\n用户问题:{user_input}\n\n请提供准确、有用的回答:"
elif intent == "chat":
return f"{base_prompt}\n\n用户说:{user_input}\n\n请进行自然的对话:"
else:
return f"{base_prompt}\n\n用户输入:{user_input}\n\n请回复:"
```
### 3. 多模型对比
```python
async def compare_models(prompt: str):
"""使用多个模型生成内容并对比"""
models = llm_api.get_available_models()
results = {}
for model_name, model_config in models.items():
success, response, reasoning, actual_model = await llm_api.generate_with_model(
prompt=prompt,
model_config=model_config,
request_type="comparison.test"
)
results[model_name] = {
"success": success,
"response": response,
"model": actual_model,
"reasoning": reasoning
}
return results
```
### 4. 智能对话插件
```python
class ChatbotPlugin(BasePlugin):
async def handle_action(self, action_data, chat_stream):
user_message = action_data.get("message", "")
# 获取历史对话上下文
context = self.get_conversation_context(chat_stream)
# 构建对话提示词
prompt = self.build_conversation_prompt(user_message, context)
# 获取模型配置
models = llm_api.get_available_models()
chat_model = models.get("chat", models.get("default"))
if not chat_model:
return {"success": False, "message": "聊天模型未配置"}
# 生成回复
success, response, reasoning, model_name = await llm_api.generate_with_model(
prompt=prompt,
model_config=chat_model,
request_type="chat.conversation",
temperature=0.8,
max_tokens=500
)
if success:
# 保存对话历史
self.save_conversation(chat_stream, user_message, response)
return {
"success": True,
"reply": response,
"model": model_name
}
return {"success": False, "message": "回复生成失败"}
def build_conversation_prompt(self, user_message: str, context: list) -> str:
"""构建对话提示词"""
prompt = "你是一个有趣、友善的聊天机器人。请自然地回复用户的消息。\n\n"
# 添加历史对话
if context:
prompt += "对话历史:\n"
for msg in context[-5:]: # 只保留最近5条
prompt += f"用户: {msg['user']}\n机器人: {msg['bot']}\n"
prompt += "\n"
prompt += f"用户: {user_message}\n机器人: "
return prompt
```
## 模型配置说明
### 常用模型类型
- `default`:默认模型
- `chat`:聊天专用模型
- `creative`:创意生成模型
- `code`:代码生成模型
### 配置参数
LLM模型支持的常用参数
- `temperature`控制输出随机性0.0-1.0
- `max_tokens`:最大生成长度
- `top_p`:核采样参数
- `frequency_penalty`:频率惩罚
- `presence_penalty`:存在惩罚
## 注意事项
1. **异步操作**LLM生成是异步的必须使用`await`
2. **错误处理**生成失败时返回False和错误信息
3. **配置依赖**:需要正确配置模型才能使用
4. **请求类型**建议为不同用途设置不同的request_type
5. **性能考虑**LLM调用可能较慢考虑超时和缓存
6. **成本控制**注意控制max_tokens以控制成本

View File

@@ -0,0 +1,342 @@
# 个人信息API
个人信息API模块提供用户信息查询和管理功能让插件能够获取和使用用户的相关信息。
## 导入方式
```python
from src.plugin_system.apis import person_api
```
## 主要功能
### 1. Person ID管理
#### `get_person_id(platform: str, user_id: int) -> str`
根据平台和用户ID获取person_id
**参数:**
- `platform`:平台名称,如 "qq", "telegram" 等
- `user_id`用户ID
**返回:**
- `str`唯一的person_idMD5哈希值
**示例:**
```python
person_id = person_api.get_person_id("qq", 123456)
print(f"Person ID: {person_id}")
```
### 2. 用户信息查询
#### `get_person_value(person_id: str, field_name: str, default: Any = None) -> Any`
根据person_id和字段名获取某个值
**参数:**
- `person_id`用户的唯一标识ID
- `field_name`:要获取的字段名,如 "nickname", "impression" 等
- `default`:当字段不存在或获取失败时返回的默认值
**返回:**
- `Any`:字段值或默认值
**示例:**
```python
nickname = await person_api.get_person_value(person_id, "nickname", "未知用户")
impression = await person_api.get_person_value(person_id, "impression")
```
#### `get_person_values(person_id: str, field_names: list, default_dict: dict = None) -> dict`
批量获取用户信息字段值
**参数:**
- `person_id`用户的唯一标识ID
- `field_names`:要获取的字段名列表
- `default_dict`:默认值字典,键为字段名,值为默认值
**返回:**
- `dict`:字段名到值的映射字典
**示例:**
```python
values = await person_api.get_person_values(
person_id,
["nickname", "impression", "know_times"],
{"nickname": "未知用户", "know_times": 0}
)
```
### 3. 用户状态查询
#### `is_person_known(platform: str, user_id: int) -> bool`
判断是否认识某个用户
**参数:**
- `platform`:平台名称
- `user_id`用户ID
**返回:**
- `bool`:是否认识该用户
**示例:**
```python
known = await person_api.is_person_known("qq", 123456)
if known:
print("这个用户我认识")
```
### 4. 用户名查询
#### `get_person_id_by_name(person_name: str) -> str`
根据用户名获取person_id
**参数:**
- `person_name`:用户名
**返回:**
- `str`person_id如果未找到返回空字符串
**示例:**
```python
person_id = person_api.get_person_id_by_name("张三")
if person_id:
print(f"找到用户: {person_id}")
```
## 使用示例
### 1. 基础用户信息获取
```python
from src.plugin_system.apis import person_api
async def get_user_info(platform: str, user_id: int):
"""获取用户基本信息"""
# 获取person_id
person_id = person_api.get_person_id(platform, user_id)
# 获取用户信息
user_info = await person_api.get_person_values(
person_id,
["nickname", "impression", "know_times", "last_seen"],
{
"nickname": "未知用户",
"impression": "",
"know_times": 0,
"last_seen": 0
}
)
return {
"person_id": person_id,
"nickname": user_info["nickname"],
"impression": user_info["impression"],
"know_times": user_info["know_times"],
"last_seen": user_info["last_seen"]
}
```
### 2. 在Action中使用用户信息
```python
from src.plugin_system.base import BaseAction
class PersonalizedAction(BaseAction):
async def execute(self, action_data, chat_stream):
# 获取发送者信息
user_id = chat_stream.user_info.user_id
platform = chat_stream.platform
# 获取person_id
person_id = person_api.get_person_id(platform, user_id)
# 获取用户昵称和印象
nickname = await person_api.get_person_value(person_id, "nickname", "朋友")
impression = await person_api.get_person_value(person_id, "impression", "")
# 根据用户信息个性化回复
if impression:
response = f"你好 {nickname}!根据我对你的了解:{impression}"
else:
response = f"你好 {nickname}!很高兴见到你。"
return {
"success": True,
"response": response,
"user_info": {
"nickname": nickname,
"impression": impression
}
}
```
### 3. 用户识别和欢迎
```python
async def welcome_user(chat_stream):
"""欢迎用户,区分新老用户"""
user_id = chat_stream.user_info.user_id
platform = chat_stream.platform
# 检查是否认识这个用户
is_known = await person_api.is_person_known(platform, user_id)
if is_known:
# 老用户,获取详细信息
person_id = person_api.get_person_id(platform, user_id)
nickname = await person_api.get_person_value(person_id, "nickname", "老朋友")
know_times = await person_api.get_person_value(person_id, "know_times", 0)
welcome_msg = f"欢迎回来,{nickname}!我们已经聊过 {know_times} 次了。"
else:
# 新用户
welcome_msg = "你好很高兴认识你我是MaiBot。"
return welcome_msg
```
### 4. 用户搜索功能
```python
async def find_user_by_name(name: str):
"""根据名字查找用户"""
person_id = person_api.get_person_id_by_name(name)
if not person_id:
return {"found": False, "message": f"未找到名为 '{name}' 的用户"}
# 获取用户详细信息
user_info = await person_api.get_person_values(
person_id,
["nickname", "platform", "user_id", "impression", "know_times"],
{}
)
return {
"found": True,
"person_id": person_id,
"info": user_info
}
```
### 5. 用户印象分析
```python
async def analyze_user_relationship(chat_stream):
"""分析用户关系"""
user_id = chat_stream.user_info.user_id
platform = chat_stream.platform
person_id = person_api.get_person_id(platform, user_id)
# 获取关系相关信息
relationship_info = await person_api.get_person_values(
person_id,
["nickname", "impression", "know_times", "relationship_level", "last_interaction"],
{
"nickname": "未知",
"impression": "",
"know_times": 0,
"relationship_level": "stranger",
"last_interaction": 0
}
)
# 分析关系程度
know_times = relationship_info["know_times"]
if know_times == 0:
relationship = "陌生人"
elif know_times < 5:
relationship = "新朋友"
elif know_times < 20:
relationship = "熟人"
else:
relationship = "老朋友"
return {
"nickname": relationship_info["nickname"],
"relationship": relationship,
"impression": relationship_info["impression"],
"interaction_count": know_times
}
```
## 常用字段说明
### 基础信息字段
- `nickname`:用户昵称
- `platform`:平台信息
- `user_id`用户ID
### 关系信息字段
- `impression`:对用户的印象
- `know_times`:交互次数
- `relationship_level`:关系等级
- `last_seen`:最后见面时间
- `last_interaction`:最后交互时间
### 个性化字段
- `preferences`:用户偏好
- `interests`:兴趣爱好
- `mood_history`:情绪历史
- `topic_interests`:话题兴趣
## 最佳实践
### 1. 错误处理
```python
async def safe_get_user_info(person_id: str, field: str):
"""安全获取用户信息"""
try:
value = await person_api.get_person_value(person_id, field)
return value if value is not None else "未设置"
except Exception as e:
logger.error(f"获取用户信息失败: {e}")
return "获取失败"
```
### 2. 批量操作
```python
async def get_complete_user_profile(person_id: str):
"""获取完整用户档案"""
# 一次性获取所有需要的字段
fields = [
"nickname", "impression", "know_times",
"preferences", "interests", "relationship_level"
]
defaults = {
"nickname": "用户",
"impression": "",
"know_times": 0,
"preferences": "{}",
"interests": "[]",
"relationship_level": "stranger"
}
profile = await person_api.get_person_values(person_id, fields, defaults)
# 处理JSON字段
try:
profile["preferences"] = json.loads(profile["preferences"])
profile["interests"] = json.loads(profile["interests"])
except:
profile["preferences"] = {}
profile["interests"] = []
return profile
```
## 注意事项
1. **异步操作**:大部分查询函数都是异步的,需要使用`await`
2. **错误处理**:所有函数都有错误处理,失败时记录日志并返回默认值
3. **数据类型**返回的数据可能是字符串、数字或JSON需要适当处理
4. **性能考虑**:批量查询优于单个查询
5. **隐私保护**:确保用户信息的使用符合隐私政策
6. **数据一致性**person_id是用户的唯一标识应妥善保存和使用

View File

@@ -0,0 +1,368 @@
# 消息发送API
消息发送API模块专门负责发送各种类型的消息支持文本、表情包、图片等多种消息类型。
## 导入方式
```python
from src.plugin_system.apis import send_api
```
## 主要功能
### 1. 文本消息发送
#### `text_to_group(text, group_id, platform="qq", typing=False, reply_to="", storage_message=True)`
向群聊发送文本消息
**参数:**
- `text`:要发送的文本内容
- `group_id`群聊ID
- `platform`:平台,默认为"qq"
- `typing`:是否显示正在输入
- `reply_to`:回复消息的格式,如"发送者:消息内容"
- `storage_message`:是否存储到数据库
**返回:**
- `bool`:是否发送成功
#### `text_to_user(text, user_id, platform="qq", typing=False, reply_to="", storage_message=True)`
向用户发送私聊文本消息
**参数与返回值同上**
### 2. 表情包发送
#### `emoji_to_group(emoji_base64, group_id, platform="qq", storage_message=True)`
向群聊发送表情包
**参数:**
- `emoji_base64`表情包的base64编码
- `group_id`群聊ID
- `platform`:平台,默认为"qq"
- `storage_message`:是否存储到数据库
#### `emoji_to_user(emoji_base64, user_id, platform="qq", storage_message=True)`
向用户发送表情包
### 3. 图片发送
#### `image_to_group(image_base64, group_id, platform="qq", storage_message=True)`
向群聊发送图片
#### `image_to_user(image_base64, user_id, platform="qq", storage_message=True)`
向用户发送图片
### 4. 命令发送
#### `command_to_group(command, group_id, platform="qq", storage_message=True)`
向群聊发送命令
#### `command_to_user(command, user_id, platform="qq", storage_message=True)`
向用户发送命令
### 5. 自定义消息发送
#### `custom_to_group(message_type, content, group_id, platform="qq", display_message="", typing=False, reply_to="", storage_message=True)`
向群聊发送自定义类型消息
#### `custom_to_user(message_type, content, user_id, platform="qq", display_message="", typing=False, reply_to="", storage_message=True)`
向用户发送自定义类型消息
#### `custom_message(message_type, content, target_id, is_group=True, platform="qq", display_message="", typing=False, reply_to="", storage_message=True)`
通用的自定义消息发送
**参数:**
- `message_type`:消息类型,如"text"、"image"、"emoji"等
- `content`:消息内容
- `target_id`目标ID群ID或用户ID
- `is_group`:是否为群聊
- `platform`:平台
- `display_message`:显示消息
- `typing`:是否显示正在输入
- `reply_to`:回复消息
- `storage_message`:是否存储
## 使用示例
### 1. 基础文本发送
```python
from src.plugin_system.apis import send_api
async def send_hello(chat_stream):
"""发送问候消息"""
if chat_stream.group_info:
# 群聊
success = await send_api.text_to_group(
text="大家好!",
group_id=chat_stream.group_info.group_id,
typing=True
)
else:
# 私聊
success = await send_api.text_to_user(
text="你好!",
user_id=chat_stream.user_info.user_id,
typing=True
)
return success
```
### 2. 回复特定消息
```python
async def reply_to_message(chat_stream, reply_text, original_sender, original_message):
"""回复特定消息"""
# 构建回复格式
reply_to = f"{original_sender}:{original_message}"
if chat_stream.group_info:
success = await send_api.text_to_group(
text=reply_text,
group_id=chat_stream.group_info.group_id,
reply_to=reply_to
)
else:
success = await send_api.text_to_user(
text=reply_text,
user_id=chat_stream.user_info.user_id,
reply_to=reply_to
)
return success
```
### 3. 发送表情包
```python
async def send_emoji_reaction(chat_stream, emotion):
"""根据情感发送表情包"""
from src.plugin_system.apis import emoji_api
# 获取表情包
emoji_result = await emoji_api.get_by_emotion(emotion)
if not emoji_result:
return False
emoji_base64, description, matched_emotion = emoji_result
# 发送表情包
if chat_stream.group_info:
success = await send_api.emoji_to_group(
emoji_base64=emoji_base64,
group_id=chat_stream.group_info.group_id
)
else:
success = await send_api.emoji_to_user(
emoji_base64=emoji_base64,
user_id=chat_stream.user_info.user_id
)
return success
```
### 4. 在Action中发送消息
```python
from src.plugin_system.base import BaseAction
class MessageAction(BaseAction):
async def execute(self, action_data, chat_stream):
message_type = action_data.get("type", "text")
content = action_data.get("content", "")
if message_type == "text":
success = await self.send_text(chat_stream, content)
elif message_type == "emoji":
success = await self.send_emoji(chat_stream, content)
elif message_type == "image":
success = await self.send_image(chat_stream, content)
else:
success = False
return {"success": success}
async def send_text(self, chat_stream, text):
if chat_stream.group_info:
return await send_api.text_to_group(text, chat_stream.group_info.group_id)
else:
return await send_api.text_to_user(text, chat_stream.user_info.user_id)
async def send_emoji(self, chat_stream, emoji_base64):
if chat_stream.group_info:
return await send_api.emoji_to_group(emoji_base64, chat_stream.group_info.group_id)
else:
return await send_api.emoji_to_user(emoji_base64, chat_stream.user_info.user_id)
async def send_image(self, chat_stream, image_base64):
if chat_stream.group_info:
return await send_api.image_to_group(image_base64, chat_stream.group_info.group_id)
else:
return await send_api.image_to_user(image_base64, chat_stream.user_info.user_id)
```
### 5. 批量发送消息
```python
async def broadcast_message(message: str, target_groups: list):
"""向多个群组广播消息"""
results = {}
for group_id in target_groups:
try:
success = await send_api.text_to_group(
text=message,
group_id=group_id,
typing=True
)
results[group_id] = success
except Exception as e:
results[group_id] = False
print(f"发送到群 {group_id} 失败: {e}")
return results
```
### 6. 智能消息发送
```python
async def smart_send(chat_stream, message_data):
"""智能发送不同类型的消息"""
message_type = message_data.get("type", "text")
content = message_data.get("content", "")
options = message_data.get("options", {})
# 根据聊天流类型选择发送方法
target_id = (chat_stream.group_info.group_id if chat_stream.group_info
else chat_stream.user_info.user_id)
is_group = chat_stream.group_info is not None
# 使用通用发送方法
success = await send_api.custom_message(
message_type=message_type,
content=content,
target_id=target_id,
is_group=is_group,
typing=options.get("typing", False),
reply_to=options.get("reply_to", ""),
display_message=options.get("display_message", "")
)
return success
```
## 消息类型说明
### 支持的消息类型
- `"text"`:纯文本消息
- `"emoji"`:表情包消息
- `"image"`:图片消息
- `"command"`:命令消息
- `"video"`:视频消息(如果支持)
- `"audio"`:音频消息(如果支持)
### 回复格式
回复消息使用格式:`"发送者:消息内容"``"发送者:消息内容"`
系统会自动查找匹配的原始消息并进行回复。
## 高级用法
### 1. 消息发送队列
```python
import asyncio
class MessageQueue:
def __init__(self):
self.queue = asyncio.Queue()
self.running = False
async def add_message(self, chat_stream, message_type, content, options=None):
"""添加消息到队列"""
message_item = {
"chat_stream": chat_stream,
"type": message_type,
"content": content,
"options": options or {}
}
await self.queue.put(message_item)
async def process_queue(self):
"""处理消息队列"""
self.running = True
while self.running:
try:
message_item = await asyncio.wait_for(self.queue.get(), timeout=1.0)
# 发送消息
success = await smart_send(
message_item["chat_stream"],
{
"type": message_item["type"],
"content": message_item["content"],
"options": message_item["options"]
}
)
# 标记任务完成
self.queue.task_done()
# 发送间隔
await asyncio.sleep(0.5)
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"处理消息队列出错: {e}")
```
### 2. 消息模板系统
```python
class MessageTemplate:
def __init__(self):
self.templates = {
"welcome": "欢迎 {nickname} 加入群聊!",
"goodbye": "{nickname} 离开了群聊。",
"notification": "🔔 通知:{message}",
"error": "❌ 错误:{error_message}",
"success": "✅ 成功:{message}"
}
def format_message(self, template_name: str, **kwargs) -> str:
"""格式化消息模板"""
template = self.templates.get(template_name, "{message}")
return template.format(**kwargs)
async def send_template(self, chat_stream, template_name: str, **kwargs):
"""发送模板消息"""
message = self.format_message(template_name, **kwargs)
if chat_stream.group_info:
return await send_api.text_to_group(message, chat_stream.group_info.group_id)
else:
return await send_api.text_to_user(message, chat_stream.user_info.user_id)
# 使用示例
template_system = MessageTemplate()
await template_system.send_template(chat_stream, "welcome", nickname="张三")
```
## 注意事项
1. **异步操作**:所有发送函数都是异步的,必须使用`await`
2. **错误处理**发送失败时返回False成功时返回True
3. **发送频率**:注意控制发送频率,避免被平台限制
4. **内容限制**:注意平台对消息内容和长度的限制
5. **权限检查**:确保机器人有发送消息的权限
6. **编码格式**图片和表情包需要使用base64编码
7. **存储选项**:可以选择是否将发送的消息存储到数据库

View File

@@ -0,0 +1,435 @@
# 工具API
工具API模块提供了各种辅助功能包括文件操作、时间处理、唯一ID生成等常用工具函数。
## 导入方式
```python
from src.plugin_system.apis import utils_api
```
## 主要功能
### 1. 文件操作
#### `get_plugin_path(caller_frame=None) -> str`
获取调用者插件的路径
**参数:**
- `caller_frame`调用者的栈帧默认为None自动获取
**返回:**
- `str`:插件目录的绝对路径
**示例:**
```python
plugin_path = utils_api.get_plugin_path()
print(f"插件路径: {plugin_path}")
```
#### `read_json_file(file_path: str, default: Any = None) -> Any`
读取JSON文件
**参数:**
- `file_path`:文件路径,可以是相对于插件目录的路径
- `default`:如果文件不存在或读取失败时返回的默认值
**返回:**
- `Any`JSON数据或默认值
**示例:**
```python
# 读取插件配置文件
config = utils_api.read_json_file("config.json", {})
settings = utils_api.read_json_file("data/settings.json", {"enabled": True})
```
#### `write_json_file(file_path: str, data: Any, indent: int = 2) -> bool`
写入JSON文件
**参数:**
- `file_path`:文件路径,可以是相对于插件目录的路径
- `data`:要写入的数据
- `indent`JSON缩进
**返回:**
- `bool`:是否写入成功
**示例:**
```python
data = {"name": "test", "value": 123}
success = utils_api.write_json_file("output.json", data)
```
### 2. 时间相关
#### `get_timestamp() -> int`
获取当前时间戳
**返回:**
- `int`:当前时间戳(秒)
#### `format_time(timestamp: Optional[int] = None, format_str: str = "%Y-%m-%d %H:%M:%S") -> str`
格式化时间
**参数:**
- `timestamp`时间戳如果为None则使用当前时间
- `format_str`:时间格式字符串
**返回:**
- `str`:格式化后的时间字符串
#### `parse_time(time_str: str, format_str: str = "%Y-%m-%d %H:%M:%S") -> int`
解析时间字符串为时间戳
**参数:**
- `time_str`:时间字符串
- `format_str`:时间格式字符串
**返回:**
- `int`:时间戳(秒)
### 3. 其他工具
#### `generate_unique_id() -> str`
生成唯一ID
**返回:**
- `str`唯一ID
## 使用示例
### 1. 插件数据管理
```python
from src.plugin_system.apis import utils_api
class DataPlugin(BasePlugin):
def __init__(self):
self.plugin_path = utils_api.get_plugin_path()
self.data_file = "plugin_data.json"
self.load_data()
def load_data(self):
"""加载插件数据"""
default_data = {
"users": {},
"settings": {"enabled": True},
"stats": {"message_count": 0}
}
self.data = utils_api.read_json_file(self.data_file, default_data)
def save_data(self):
"""保存插件数据"""
return utils_api.write_json_file(self.data_file, self.data)
async def handle_action(self, action_data, chat_stream):
# 更新统计信息
self.data["stats"]["message_count"] += 1
self.data["stats"]["last_update"] = utils_api.get_timestamp()
# 保存数据
if self.save_data():
return {"success": True, "message": "数据已保存"}
else:
return {"success": False, "message": "数据保存失败"}
```
### 2. 日志记录系统
```python
class PluginLogger:
def __init__(self, plugin_name: str):
self.plugin_name = plugin_name
self.log_file = f"{plugin_name}_log.json"
self.logs = utils_api.read_json_file(self.log_file, [])
def log_event(self, event_type: str, message: str, data: dict = None):
"""记录事件"""
log_entry = {
"id": utils_api.generate_unique_id(),
"timestamp": utils_api.get_timestamp(),
"formatted_time": utils_api.format_time(),
"event_type": event_type,
"message": message,
"data": data or {}
}
self.logs.append(log_entry)
# 保持最新的100条记录
if len(self.logs) > 100:
self.logs = self.logs[-100:]
# 保存到文件
utils_api.write_json_file(self.log_file, self.logs)
def get_logs_by_type(self, event_type: str) -> list:
"""获取指定类型的日志"""
return [log for log in self.logs if log["event_type"] == event_type]
def get_recent_logs(self, count: int = 10) -> list:
"""获取最近的日志"""
return self.logs[-count:]
# 使用示例
logger = PluginLogger("my_plugin")
logger.log_event("user_action", "用户发送了消息", {"user_id": "123", "message": "hello"})
```
### 3. 配置管理系统
```python
class ConfigManager:
def __init__(self, config_file: str = "plugin_config.json"):
self.config_file = config_file
self.default_config = {
"enabled": True,
"debug": False,
"max_users": 100,
"response_delay": 1.0,
"features": {
"auto_reply": True,
"logging": True
}
}
self.config = self.load_config()
def load_config(self) -> dict:
"""加载配置"""
return utils_api.read_json_file(self.config_file, self.default_config)
def save_config(self) -> bool:
"""保存配置"""
return utils_api.write_json_file(self.config_file, self.config, indent=4)
def get(self, key: str, default=None):
"""获取配置值,支持嵌套访问"""
keys = key.split('.')
value = self.config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value
def set(self, key: str, value):
"""设置配置值,支持嵌套设置"""
keys = key.split('.')
config = self.config
for k in keys[:-1]:
if k not in config:
config[k] = {}
config = config[k]
config[keys[-1]] = value
def update_config(self, updates: dict):
"""批量更新配置"""
def deep_update(base, updates):
for key, value in updates.items():
if isinstance(value, dict) and key in base and isinstance(base[key], dict):
deep_update(base[key], value)
else:
base[key] = value
deep_update(self.config, updates)
# 使用示例
config = ConfigManager()
print(f"调试模式: {config.get('debug', False)}")
print(f"自动回复: {config.get('features.auto_reply', True)}")
config.set('features.new_feature', True)
config.save_config()
```
### 4. 缓存系统
```python
class PluginCache:
def __init__(self, cache_file: str = "plugin_cache.json", ttl: int = 3600):
self.cache_file = cache_file
self.ttl = ttl # 缓存过期时间(秒)
self.cache = self.load_cache()
def load_cache(self) -> dict:
"""加载缓存"""
return utils_api.read_json_file(self.cache_file, {})
def save_cache(self):
"""保存缓存"""
return utils_api.write_json_file(self.cache_file, self.cache)
def get(self, key: str):
"""获取缓存值"""
if key not in self.cache:
return None
item = self.cache[key]
current_time = utils_api.get_timestamp()
# 检查是否过期
if current_time - item["timestamp"] > self.ttl:
del self.cache[key]
return None
return item["value"]
def set(self, key: str, value):
"""设置缓存值"""
self.cache[key] = {
"value": value,
"timestamp": utils_api.get_timestamp()
}
self.save_cache()
def clear_expired(self):
"""清理过期缓存"""
current_time = utils_api.get_timestamp()
expired_keys = []
for key, item in self.cache.items():
if current_time - item["timestamp"] > self.ttl:
expired_keys.append(key)
for key in expired_keys:
del self.cache[key]
if expired_keys:
self.save_cache()
return len(expired_keys)
# 使用示例
cache = PluginCache(ttl=1800) # 30分钟过期
cache.set("user_data_123", {"name": "张三", "score": 100})
user_data = cache.get("user_data_123")
```
### 5. 时间处理工具
```python
class TimeHelper:
@staticmethod
def get_time_info():
"""获取当前时间的详细信息"""
timestamp = utils_api.get_timestamp()
return {
"timestamp": timestamp,
"datetime": utils_api.format_time(timestamp),
"date": utils_api.format_time(timestamp, "%Y-%m-%d"),
"time": utils_api.format_time(timestamp, "%H:%M:%S"),
"year": utils_api.format_time(timestamp, "%Y"),
"month": utils_api.format_time(timestamp, "%m"),
"day": utils_api.format_time(timestamp, "%d"),
"weekday": utils_api.format_time(timestamp, "%A")
}
@staticmethod
def time_ago(timestamp: int) -> str:
"""计算时间差"""
current = utils_api.get_timestamp()
diff = current - timestamp
if diff < 60:
return f"{diff}秒前"
elif diff < 3600:
return f"{diff // 60}分钟前"
elif diff < 86400:
return f"{diff // 3600}小时前"
else:
return f"{diff // 86400}天前"
@staticmethod
def parse_duration(duration_str: str) -> int:
"""解析时间段字符串,返回秒数"""
import re
pattern = r'(\d+)([smhd])'
matches = re.findall(pattern, duration_str.lower())
total_seconds = 0
for value, unit in matches:
value = int(value)
if unit == 's':
total_seconds += value
elif unit == 'm':
total_seconds += value * 60
elif unit == 'h':
total_seconds += value * 3600
elif unit == 'd':
total_seconds += value * 86400
return total_seconds
# 使用示例
time_info = TimeHelper.get_time_info()
print(f"当前时间: {time_info['datetime']}")
last_seen = 1699000000
print(f"最后见面: {TimeHelper.time_ago(last_seen)}")
duration = TimeHelper.parse_duration("1h30m") # 1小时30分钟 = 5400秒
```
## 最佳实践
### 1. 错误处理
```python
def safe_file_operation(file_path: str, data: dict):
"""安全的文件操作"""
try:
success = utils_api.write_json_file(file_path, data)
if not success:
logger.warning(f"文件写入失败: {file_path}")
return success
except Exception as e:
logger.error(f"文件操作出错: {e}")
return False
```
### 2. 路径处理
```python
import os
def get_data_path(filename: str) -> str:
"""获取数据文件的完整路径"""
plugin_path = utils_api.get_plugin_path()
data_dir = os.path.join(plugin_path, "data")
# 确保数据目录存在
os.makedirs(data_dir, exist_ok=True)
return os.path.join(data_dir, filename)
```
### 3. 定期清理
```python
async def cleanup_old_files():
"""清理旧文件"""
plugin_path = utils_api.get_plugin_path()
current_time = utils_api.get_timestamp()
for filename in os.listdir(plugin_path):
if filename.endswith('.tmp'):
file_path = os.path.join(plugin_path, filename)
file_time = os.path.getmtime(file_path)
# 删除超过24小时的临时文件
if current_time - file_time > 86400:
os.remove(file_path)
```
## 注意事项
1. **相对路径**:文件路径支持相对于插件目录的路径
2. **自动创建目录**:写入文件时会自动创建必要的目录
3. **错误处理**:所有函数都有错误处理,失败时返回默认值
4. **编码格式**文件读写使用UTF-8编码
5. **时间格式**:时间戳使用秒为单位
6. **JSON格式**JSON文件使用可读性好的缩进格式

View File

@@ -474,53 +474,6 @@ async def execute(self) -> Tuple[bool, str]:
注意:配置文件是自动生成的,不要手动创建!
```
## 🎯 你学会了什么
恭喜你刚刚从零开始创建了一个完整的MaiCore插件让我们回顾一下
### 核心概念
- **插件Plugin**: 包含多个功能组件的集合
- **Action组件**: 智能动作,由麦麦根据情境自动选择使用
- **Command组件**: 直接响应用户命令的功能
- **配置Schema**: 定义配置结构,系统自动生成配置文件
### 开发流程
1. ✅ 创建最简单的插件框架
2. ✅ 添加Action
3. ✅ 理解激活系统的工作原理
4. ✅ 尝试KEYWORD激活的Action进阶
5. ✅ 添加Command组件
6. ✅ 可选定义配置Schema
7. ✅ 测试完整功能
## 📚 进阶学习
现在你已经掌握了基础,可以继续深入学习:
1. **掌握更多Action功能** 📖 [Action组件详解](action-components.md)
- 学习不同的激活方式
- 了解Action的生命周期
- 掌握参数传递
2. **学会配置管理** ⚙️ [插件配置定义指南](configuration-guide.md)
- 定义配置Schema
- 自动生成配置文件
- 配置验证和类型检查
3. **深入Command系统** 📖 [Command组件详解](command-components.md)
- 复杂正则表达式
- 参数提取和处理
- 错误处理
4. **掌握API系统** 📖 [新API使用指南](examples/replyer_api_usage.md)
- replyer_1智能生成
- 高级消息处理
- 表情和媒体发送
祝你插件开发愉快!🎉
```

View File

@@ -1,8 +1,7 @@
from typing import List, Tuple, Type
from src.plugin_system import (
BasePlugin, register_plugin, BaseAction, BaseCommand,
ComponentInfo, ActionActivationType, ChatMode,
ConfigField
ComponentInfo, ActionActivationType, ConfigField
)
# ===== Action组件 =====

View File

@@ -2,10 +2,20 @@
发送API模块
专门负责发送各种类型的消息采用标准Python包设计模式
使用方式:
from src.plugin_system.apis import send_api
# 方式1直接使用stream_id推荐
await send_api.text_to_stream("hello", stream_id)
await send_api.emoji_to_stream(emoji_base64, stream_id)
await send_api.custom_to_stream("video", video_data, stream_id)
# 方式2使用群聊/私聊指定函数
await send_api.text_to_group("hello", "123456")
await send_api.emoji_to_group(emoji_base64, "123456")
await send_api.text_to_user("hello", "987654")
# 方式3使用通用custom_message函数
await send_api.custom_message("video", video_data, "123456", True)
"""
@@ -224,6 +234,96 @@ async def _find_reply_message(target_stream, reply_to: str) -> Optional[MessageR
# =============================================================================
async def text_to_stream(
text: str,
stream_id: str,
typing: bool = False,
reply_to: str = "",
storage_message: bool = True,
) -> bool:
"""向指定流发送文本消息
Args:
text: 要发送的文本内容
stream_id: 聊天流ID
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
storage_message: 是否存储消息到数据库
Returns:
bool: 是否发送成功
"""
return await _send_to_target("text", text, stream_id, "", typing, reply_to, storage_message)
async def emoji_to_stream(emoji_base64: str, stream_id: str, storage_message: bool = True) -> bool:
"""向指定流发送表情包
Args:
emoji_base64: 表情包的base64编码
stream_id: 聊天流ID
storage_message: 是否存储消息到数据库
Returns:
bool: 是否发送成功
"""
return await _send_to_target("emoji", emoji_base64, stream_id, "", typing=False, storage_message=storage_message)
async def image_to_stream(image_base64: str, stream_id: str, storage_message: bool = True) -> bool:
"""向指定流发送图片
Args:
image_base64: 图片的base64编码
stream_id: 聊天流ID
storage_message: 是否存储消息到数据库
Returns:
bool: 是否发送成功
"""
return await _send_to_target("image", image_base64, stream_id, "", typing=False, storage_message=storage_message)
async def command_to_stream(command: str, stream_id: str, storage_message: bool = True) -> bool:
"""向指定流发送命令
Args:
command: 命令
stream_id: 聊天流ID
storage_message: 是否存储消息到数据库
Returns:
bool: 是否发送成功
"""
return await _send_to_target("command", command, stream_id, "", typing=False, storage_message=storage_message)
async def custom_to_stream(
message_type: str,
content: str,
stream_id: str,
display_message: str = "",
typing: bool = False,
reply_to: str = "",
storage_message: bool = True,
) -> bool:
"""向指定流发送自定义类型消息
Args:
message_type: 消息类型,如"text""image""emoji""video""file"
content: 消息内容通常是base64编码或文本
stream_id: 聊天流ID
display_message: 显示消息
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
storage_message: 是否存储消息到数据库
Returns:
bool: 是否发送成功
"""
return await _send_to_target(message_type, content, stream_id, display_message, typing, reply_to, storage_message)
async def text_to_group(
text: str,
group_id: str,

View File

@@ -204,21 +204,19 @@ class BaseAction(ABC):
Args:
content: 文本内容
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
if not self.target_id or not self.platform:
logger.error(f"{self.log_prefix} 缺少发送消息所需的信息")
if not self.chat_id:
logger.error(f"{self.log_prefix} 缺少聊天ID")
return False
if self.is_group:
return await send_api.text_to_group(
text=content, group_id=self.target_id, platform=self.platform, reply_to=reply_to
)
else:
return await send_api.text_to_user(
text=content, user_id=self.target_id, platform=self.platform, reply_to=reply_to
return await send_api.text_to_stream(
text=content,
stream_id=self.chat_id,
reply_to=reply_to
)
async def send_emoji(self, emoji_base64: str) -> bool:
@@ -230,17 +228,11 @@ class BaseAction(ABC):
Returns:
bool: 是否发送成功
"""
# 导入send_api
from src.plugin_system.apis import send_api
if not self.target_id or not self.platform:
logger.error(f"{self.log_prefix} 缺少发送消息所需的信息")
if not self.chat_id:
logger.error(f"{self.log_prefix} 缺少聊天ID")
return False
if self.is_group:
return await send_api.emoji_to_group(emoji_base64, self.target_id, self.platform)
else:
return await send_api.emoji_to_user(emoji_base64, self.target_id, self.platform)
return await send_api.emoji_to_stream(emoji_base64, self.chat_id)
async def send_image(self, image_base64: str) -> bool:
"""发送图片
@@ -251,43 +243,34 @@ class BaseAction(ABC):
Returns:
bool: 是否发送成功
"""
# 导入send_api
from src.plugin_system.apis import send_api
if not self.target_id or not self.platform:
logger.error(f"{self.log_prefix} 缺少发送消息所需的信息")
if not self.chat_id:
logger.error(f"{self.log_prefix} 缺少聊天ID")
return False
if self.is_group:
return await send_api.image_to_group(image_base64, self.target_id, self.platform)
else:
return await send_api.image_to_user(image_base64, self.target_id, self.platform)
return await send_api.image_to_stream(image_base64, self.chat_id)
async def send_custom(self, message_type: str, content: str, typing: bool = False) -> bool:
async def send_custom(self, message_type: str, content: str, typing: bool = False, reply_to: str = "") -> bool:
"""发送自定义类型消息
Args:
message_type: 消息类型,如"video""file""audio"
content: 消息内容
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
# 导入send_api
from src.plugin_system.apis import send_api
if not self.target_id or not self.platform:
logger.error(f"{self.log_prefix} 缺少发送消息所需的信息")
if not self.chat_id:
logger.error(f"{self.log_prefix} 缺少聊天ID")
return False
return await send_api.custom_message(
return await send_api.custom_to_stream(
message_type=message_type,
content=content,
target_id=self.target_id,
is_group=self.is_group,
platform=self.platform,
stream_id=self.chat_id,
typing=typing,
reply_to=reply_to,
)
async def store_action_info(
@@ -318,34 +301,28 @@ class BaseAction(ABC):
) -> bool:
"""发送命令消息
使用和send_text相同的方式通过MessageAPI发送命令
使用stream API发送命令
Args:
command_name: 命令名称
args: 命令参数
display_message: 显示消息
storage_message: 是否存储消息到数据库
Returns:
bool: 是否发送成功
"""
try:
if not self.chat_id:
logger.error(f"{self.log_prefix} 缺少聊天ID")
return False
# 构造命令数据
command_data = {"name": command_name, "args": args or {}}
if self.is_group:
# 群聊
success = await send_api.command_to_group(
success = await send_api.command_to_stream(
command=command_data,
group_id=str(self.group_id),
platform=self.platform,
storage_message=storage_message,
)
else:
# 私聊
success = await send_api.command_to_user(
command=command_data,
user_id=str(self.user_id),
platform=self.platform,
stream_id=self.chat_id,
storage_message=storage_message,
)

View File

@@ -86,30 +86,30 @@ class BaseCommand(ABC):
return current
async def send_text(self, content: str) -> None:
async def send_text(self, content: str, reply_to: str = "") -> bool:
"""发送回复消息
Args:
content: 回复内容
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
# 获取聊天流信息
chat_stream = self.message.chat_stream
if not chat_stream or not hasattr(chat_stream, 'stream_id'):
logger.error(f"{self.log_prefix} 缺少聊天流或stream_id")
return False
if chat_stream.group_info:
# 群聊
await send_api.text_to_group(
text=content, group_id=str(chat_stream.group_info.group_id), platform=chat_stream.platform
)
else:
# 私聊
await send_api.text_to_user(
text=content, user_id=str(chat_stream.user_info.user_id), platform=chat_stream.platform
return await send_api.text_to_stream(
text=content,
stream_id=chat_stream.stream_id,
reply_to=reply_to
)
async def send_type(
self, message_type: str, content: str, display_message: str = None, typing: bool = False
self, message_type: str, content: str, display_message: str = "", typing: bool = False, reply_to: str = ""
) -> bool:
"""发送指定类型的回复消息到当前聊天环境
@@ -117,77 +117,53 @@ class BaseCommand(ABC):
message_type: 消息类型,如"text""image""emoji"
content: 消息内容
display_message: 显示消息(可选)
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
# 获取聊天流信息
chat_stream = self.message.chat_stream
if not chat_stream or not hasattr(chat_stream, 'stream_id'):
logger.error(f"{self.log_prefix} 缺少聊天流或stream_id")
return False
if chat_stream.group_info:
# 群聊
from src.plugin_system.apis import send_api
return await send_api.custom_message(
return await send_api.custom_to_stream(
message_type=message_type,
content=content,
target_id=str(chat_stream.group_info.group_id),
is_group=True,
platform=chat_stream.platform,
typing=typing,
)
else:
# 私聊
from src.plugin_system.apis import send_api
return await send_api.custom_message(
message_type=message_type,
content=content,
target_id=str(chat_stream.user_info.user_id),
is_group=False,
platform=chat_stream.platform,
stream_id=chat_stream.stream_id,
display_message=display_message,
typing=typing,
reply_to=reply_to,
)
async def send_command(self, command_name: str, args: dict = None, display_message: str = None) -> bool:
async def send_command(self, command_name: str, args: dict = None, display_message: str = "", storage_message: bool = True) -> bool:
"""发送命令消息
Args:
command_name: 命令名称
args: 命令参数
display_message: 显示消息
storage_message: 是否存储消息到数据库
Returns:
bool: 是否发送成功
"""
try:
# 获取聊天流信息
chat_stream = self.message.chat_stream
if not chat_stream or not hasattr(chat_stream, 'stream_id'):
logger.error(f"{self.log_prefix} 缺少聊天流或stream_id")
return False
# 构造命令数据
command_data = {"name": command_name, "args": args or {}}
# 获取聊天流信息
chat_stream = self.message.chat_stream
if chat_stream.group_info:
# 群聊
from src.plugin_system.apis import send_api
success = await send_api.custom_message(
message_type="command",
content=command_data,
target_id=str(chat_stream.group_info.group_id),
is_group=True,
platform=chat_stream.platform,
)
else:
# 私聊
from src.plugin_system.apis import send_api
success = await send_api.custom_message(
message_type="command",
content=command_data,
target_id=str(chat_stream.user_info.user_id),
is_group=False,
platform=chat_stream.platform,
success = await send_api.command_to_stream(
command=command_data,
stream_id=chat_stream.stream_id,
storage_message=storage_message,
)
if success:
@@ -201,6 +177,38 @@ class BaseCommand(ABC):
logger.error(f"{self.log_prefix} 发送命令时出错: {e}")
return False
async def send_emoji(self, emoji_base64: str) -> bool:
"""发送表情包
Args:
emoji_base64: 表情包的base64编码
Returns:
bool: 是否发送成功
"""
chat_stream = self.message.chat_stream
if not chat_stream or not hasattr(chat_stream, 'stream_id'):
logger.error(f"{self.log_prefix} 缺少聊天流或stream_id")
return False
return await send_api.emoji_to_stream(emoji_base64, chat_stream.stream_id)
async def send_image(self, image_base64: str) -> bool:
"""发送图片
Args:
image_base64: 图片的base64编码
Returns:
bool: 是否发送成功
"""
chat_stream = self.message.chat_stream
if not chat_stream or not hasattr(chat_stream, 'stream_id'):
logger.error(f"{self.log_prefix} 缺少聊天流或stream_id")
return False
return await send_api.image_to_stream(image_base64, chat_stream.stream_id)
@classmethod
def get_command_info(cls) -> "CommandInfo":
"""从类属性生成CommandInfo