From 43425b3c1f8539348038387ee9b4b7d88a0bf19e Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Thu, 19 Jun 2025 23:21:31 +0800 Subject: [PATCH] =?UTF-8?q?doc=EF=BC=9A=E5=AE=8C=E5=96=84doc?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/plugins/README.md | 159 ++------- docs/plugins/action-components.md | 232 ++----------- docs/plugins/api/chat-api.md | 151 +++++++++ docs/plugins/api/config-api.md | 183 +++++++++++ docs/plugins/api/database-api.md | 258 +++++++++++++++ docs/plugins/api/emoji-api.md | 253 ++++++++++++++ docs/plugins/api/generator-api.md | 341 +++++++++++++++++++ docs/plugins/api/llm-api.md | 244 ++++++++++++++ docs/plugins/api/person-api.md | 342 +++++++++++++++++++ docs/plugins/api/send-api.md | 368 +++++++++++++++++++++ docs/plugins/api/utils-api.md | 435 +++++++++++++++++++++++++ docs/plugins/quick-start.md | 51 +-- plugins/hello_world_plugin/plugin.py | 3 +- src/plugin_system/apis/send_api.py | 102 +++++- src/plugin_system/base/base_action.py | 87 ++--- src/plugin_system/base/base_command.py | 136 ++++---- 16 files changed, 2824 insertions(+), 521 deletions(-) create mode 100644 docs/plugins/api/chat-api.md create mode 100644 docs/plugins/api/config-api.md create mode 100644 docs/plugins/api/database-api.md create mode 100644 docs/plugins/api/emoji-api.md create mode 100644 docs/plugins/api/generator-api.md create mode 100644 docs/plugins/api/llm-api.md create mode 100644 docs/plugins/api/person-api.md create mode 100644 docs/plugins/api/send-api.md create mode 100644 docs/plugins/api/utils-api.md diff --git a/docs/plugins/README.md b/docs/plugins/README.md index 1ae26bb6f..f71aa1e79 100644 --- a/docs/plugins/README.md +++ b/docs/plugins/README.md @@ -2,157 +2,42 @@ > 欢迎来到MaiBot插件系统开发文档!这里是你开始插件开发旅程的最佳起点。 -## 🎯 快速导航 - -### 🌟 新手入门 +## 新手入门 - [📖 快速开始指南](quick-start.md) - 5分钟创建你的第一个插件 + +## 组件功能详解 + - [🧱 Action组件详解](action-components.md) - 掌握最核心的Action组件 - [💻 Command组件详解](command-components.md) - 学习直接响应命令的组件 - [⚙️ 配置管理指南](configuration-guide.md) - 学会使用配置驱动开发 -### 📖 API参考 +- [🔧 工具系统详解](tool-system.md) - 工具系统的使用和开发(非主要功能) -- [📡 消息API](api/message-api.md) - 消息发送接口 +## API浏览 -### 🔧 高级主题 +### 🔗 核心通信API +- [📡 消息API](api/message-api.md) - 消息接收和处理接口 +- [📤 发送API](api/send-api.md) - 各种类型消息发送接口 +- [💬 聊天API](api/chat-api.md) - 聊天流管理和查询接口 -- [📦 依赖管理系统](dependency-management.md) - Python包依赖管理 -- [🔧 工具系统详解](tool-system.md) - 工具系统的使用和开发 +### 🤖 AI与生成API +- [🧠 LLM API](api/llm-api.md) - 大语言模型交互接口 +- [✨ 回复生成器API](api/generator-api.md) - 智能回复生成接口 +- [😊 表情包API](api/emoji-api.md) - 表情包选择和管理接口 -## 🔥 最新更新 (v2.0 新API格式) +### 📊 数据与配置API +- [🗄️ 数据库API](api/database-api.md) - 数据库操作接口 +- [⚙️ 配置API](api/config-api.md) - 配置读取和用户信息接口 +- [👤 个人信息API](api/person-api.md) - 用户信息查询接口 -### 🎉 重大变更 +### 🛠️ 工具API +- [🔧 工具API](api/utils-api.md) - 文件操作、时间处理等工具函数 -1. **新API格式**: - - 不再使用 `self.api`,改为直接方法调用 - - `await self.send_text()` 替代旧的发送方式 - - `await self.send_emoji()` 专门的表情发送方法 - - `self.get_config()` 简化的配置访问 -2. **replyer_1集成**: - - 新增专用的 `generator_api` 模块 - - 在Action中直接使用 `replyer_1` 生成个性化内容 - - 支持多种生成风格和情感色彩 - -3. **更好的类型安全**: - - 完整的类型注解支持 - - 更清晰的返回值类型 - - 更好的IDE支持 - -## 🚀 最佳学习路径 - -### 📚 初学者路径(推荐) - -1. **基础入门**: - ``` - 快速开始指南 → Action组件详解 → Command组件详解 - ``` - -2. **API掌握**: - ``` - 消息API指南 → 配置管理指南 - ``` - -3. **高级功能**: - ``` - 依赖管理系统 → 工具系统详解 - ``` - -## 💡 核心概念速览 - -### 🧱 Action组件 - -- **用途**:增强麦麦的主动行为,让对话更自然 -- **激活**:关键词、LLM判断、随机等多种方式 -- **新特性**:支持replyer_1智能生成、更简洁的API - -### 💻 Command组件 - -- **用途**:响应用户的明确指令,提供确定性功能 -- **触发**:正则表达式匹配用户输入 -- **特点**:即时响应、参数解析、拦截控制 - -### ⚙️ 配置系统 - -- **Schema驱动**:使用ConfigField定义配置结构 -- **类型安全**:强类型配置验证 -- **嵌套访问**:支持 `section.key` 形式访问 - -### 🧠 replyer_1集成 - -- **智能生成**:AI驱动的个性化内容生成 -- **简单易用**:通过 `generator_api` 轻松调用 -- **灵活配置**:支持多种生成风格和参数 - -## 📋 开发清单 - -在开始开发之前,确保你已经: - -- [ ] 阅读了[快速开始指南](quick-start.md) -- [ ] 了解了Action组件或Command组件 -- [ ] 熟悉了[Action组件](action-components.md)或[Command组件](command-components.md) -- [ ] 查看了[配置管理](configuration-guide.md) - -开发完成后,请检查: - -- [ ] 使用了新的API格式(`self.send_text()`等) -- [ ] 正确配置了Schema和ConfigField -- [ ] 添加了适当的错误处理 -- [ ] 测试了所有功能路径 - -## 🤝 获取帮助 - -### 📖 文档问题 - -如果你在文档中发现错误或需要补充,请: +> 如果你在文档中发现错误或需要补充,请: 1. 检查最新的文档版本 2. 查看相关示例代码 3. 参考其他类似插件 - -### 💻 开发问题 - -遇到开发问题时: - -1. 查看现有插件示例 -2. 检查配置是否正确 -3. 参考API文档 - -### 🎯 最佳实践建议 - -为了创建高质量的插件: - -1. 始终使用新的API格式 -2. 充分利用replyer_1的智能生成能力 -3. 设计配置驱动的功能 -4. 实现完善的错误处理 -5. 编写清晰的文档注释 - ---- - -## 🌟 推荐插件示例 - -### 🎯 新手友好 - -- **Hello World插件**:展示基础API使用 -- **简单计算器**:Command组件入门 -- **智能问候**:Action组件和replyer_1集成 - -### 🔧 实用工具 - -- **智能聊天助手**:完整的replyer_1集成示例 -- **用户管理系统**:配置驱动的复杂功能 -- **定时提醒插件**:状态管理和持久化 - -### 🚀 高级应用 - -- **多功能聊天助手**:综合功能展示 -- **游戏管理插件**:复杂状态管理 -- **数据分析插件**:外部服务集成 - ---- - -**🎉 准备好开始了吗?从[快速开始指南](quick-start.md)开始你的插件开发之旅!** - -使用新的API格式,你可以创建更强大、更智能、更易维护的插件。让我们一起构建更好的MaiBot生态系统! +4. 提交文档仓库issue diff --git a/docs/plugins/action-components.md b/docs/plugins/action-components.md index 3b541f14f..e8ae77f74 100644 --- a/docs/plugins/action-components.md +++ b/docs/plugins/action-components.md @@ -171,18 +171,18 @@ class MessageAction(BaseAction): async def execute(self) -> Tuple[bool, str]: # 发送文本消息 - 自动判断群聊/私聊 await self.send_text("Hello World!") - + # 发送表情包 emoji_base64 = await emoji_api.get_by_description("开心") if emoji_base64: await self.send_emoji(emoji_base64) - + # 发送图片 await self.send_image(image_base64) - + # 发送自定义类型消息 await self.send_custom("video", video_data, typing=True) - + return True, "消息发送完成" ``` @@ -200,7 +200,7 @@ class SmartReplyAction(BaseAction): "topic": "日常聊天", "replyer_name": "replyer_1" # 指定使用replyer_1 } - + # 使用generator_api生成回复 success, reply_set = await generator_api.generate_reply( chat_stream=self.chat_stream, @@ -209,7 +209,7 @@ class SmartReplyAction(BaseAction): chat_id=self.chat_id, is_group=self.is_group ) - + if success and reply_set: # 提取并发送文本回复 for reply_type, reply_content in reply_set: @@ -217,14 +217,14 @@ class SmartReplyAction(BaseAction): await self.send_text(reply_content) elif reply_type == "emoji": await self.send_emoji(reply_content) - + # 记录动作 await self.store_action_info( action_build_into_prompt=True, action_prompt_display=f"使用replyer_1生成了智能回复", action_done=True ) - + return True, "智能回复生成成功" else: return False, "回复生成失败" @@ -241,11 +241,11 @@ class ConfigurableAction(BaseAction): enable_feature = self.get_config("features.enable_smart_mode", False) max_length = self.get_config("limits.max_text_length", 200) style = self.get_config("behavior.response_style", "friendly") - + if enable_feature: # 启用高级功能 pass - + return True, "配置获取成功" ``` @@ -258,7 +258,7 @@ class DataAction(BaseAction): async def execute(self) -> Tuple[bool, str]: # 使用database_api from src.plugin_system.apis import database_api - + # 存储数据 await database_api.store_action_info( chat_stream=self.chat_stream, @@ -266,7 +266,7 @@ class DataAction(BaseAction): action_data=self.action_data, # ... 其他参数 ) - + return True, "数据存储完成" ``` @@ -292,19 +292,19 @@ class GreetingAction(BaseAction): "text": "生成一个友好的问候语", "replyer_name": "replyer_1" } - + success, reply_set = await generator_api.generate_reply( chat_stream=self.chat_stream, action_data=greeting_data ) - + if success: for reply_type, content in reply_set: if reply_type == "text": await self.send_text(content) break return True, "发送智能问候" - + # 传统问候方式 await self.send_text("你好!很高兴见到你!") return True, "发送问候" @@ -336,12 +336,12 @@ class HelpAction(BaseAction): "help_type": self.action_data.get("help_type", "general"), "replyer_name": "replyer_1" } - + success, reply_set = await generator_api.generate_reply( chat_stream=self.chat_stream, action_data=help_data ) - + if success: for reply_type, content in reply_set: if reply_type == "text": @@ -366,204 +366,10 @@ class SurpriseAction(BaseAction): async def execute(self) -> Tuple[bool, str]: import random - + surprises = ["🎉", "✨", "🌟", "💝", "🎈"] selected = random.choice(surprises) - + await self.send_emoji(selected) return True, f"发送了惊喜表情: {selected}" ``` - -## 💡 完整示例 - -### 智能聊天Action - -```python -from src.plugin_system.apis import generator_api, emoji_api - -class IntelligentChatAction(BaseAction): - """智能聊天Action - 展示新API的完整用法""" - - # 激活设置 - focus_activation_type = ActionActivationType.ALWAYS - normal_activation_type = ActionActivationType.LLM_JUDGE - mode_enable = ChatMode.ALL - parallel_action = False - - # 基本信息 - action_name = "intelligent_chat" - action_description = "使用replyer_1进行智能聊天回复,支持表情包和个性化回复" - - # LLM判断提示词 - llm_judge_prompt = """ - 判断是否需要进行智能聊天回复: - 1. 用户提出了有趣的话题 - 2. 需要更加个性化的回复 - 3. 适合发送表情包的情况 - - 请回答"是"或"否"。 - """ - - # 功能定义 - action_parameters = { - "topic": "聊天话题", - "mood": "当前氛围(happy/sad/excited/calm)", - "include_emoji": "是否包含表情包(true/false)" - } - - action_require = [ - "需要更个性化回复时使用", - "聊天氛围适合发送表情时使用", - "避免在正式场合使用" - ] - - associated_types = ["text", "emoji"] - - async def execute(self) -> Tuple[bool, str]: - # 获取参数 - topic = self.action_data.get("topic", "日常聊天") - mood = self.action_data.get("mood", "happy") - include_emoji = self.action_data.get("include_emoji", "true") == "true" - - # 构建智能回复数据 - chat_data = { - "text": f"请针对{topic}话题进行回复,当前氛围是{mood}", - "topic": topic, - "mood": mood, - "style": "conversational", - "replyer_name": "replyer_1" # 使用replyer_1 - } - - # 生成智能回复 - success, reply_set = await generator_api.generate_reply( - chat_stream=self.chat_stream, - action_data=chat_data, - platform=self.platform, - chat_id=self.chat_id, - is_group=self.is_group - ) - - reply_sent = False - - if success and reply_set: - # 发送生成的回复 - for reply_type, content in reply_set: - if reply_type == "text": - await self.send_text(content) - reply_sent = True - elif reply_type == "emoji": - await self.send_emoji(content) - - # 如果配置允许且生成失败,发送表情包 - if include_emoji and not reply_sent: - emoji_result = await emoji_api.get_by_description(mood) - if emoji_result: - emoji_base64, emoji_desc, matched_emotion = emoji_result - await self.send_emoji(emoji_base64) - reply_sent = True - - # 记录动作执行 - if reply_sent: - await self.store_action_info( - action_build_into_prompt=True, - action_prompt_display=f"进行了智能聊天回复,话题:{topic},氛围:{mood}", - action_done=True - ) - return True, f"完成智能聊天回复:{topic}" - else: - return False, "智能回复生成失败" -``` - -## 🛠️ 调试技巧 - -### 开发调试Action - -```python -class DebugAction(BaseAction): - """调试Action - 展示如何调试新API""" - - focus_activation_type = ActionActivationType.KEYWORD - normal_activation_type = ActionActivationType.KEYWORD - activation_keywords = ["debug", "调试"] - mode_enable = ChatMode.ALL - parallel_action = True - - action_name = "debug_helper" - action_description = "调试助手,显示当前状态信息" - - action_parameters = {} - action_require = ["需要调试信息时使用"] - associated_types = ["text"] - - async def execute(self) -> Tuple[bool, str]: - # 收集调试信息 - debug_info = { - "聊天类型": "群聊" if self.is_group else "私聊", - "平台": self.platform, - "目标ID": self.target_id, - "用户ID": self.user_id, - "用户昵称": self.user_nickname, - "动作数据": self.action_data, - } - - if self.is_group: - debug_info.update({ - "群ID": self.group_id, - "群名": self.group_name, - }) - - # 格式化调试信息 - info_lines = ["🔍 调试信息:"] - for key, value in debug_info.items(): - info_lines.append(f" • {key}: {value}") - - debug_text = "\n".join(info_lines) - - # 发送调试信息 - await self.send_text(debug_text) - - # 测试配置获取 - test_config = self.get_config("debug.verbose", True) - if test_config: - await self.send_text(f"配置测试: debug.verbose = {test_config}") - - return True, "调试信息已发送" -``` - -## 📚 最佳实践 - -1. **总是导入所需的API模块**: - - ```python - from src.plugin_system.apis import generator_api, send_api, emoji_api - ``` -2. **在生成内容时指定replyer_1**: - - ```python - action_data = { - "text": "生成内容的请求", - "replyer_name": "replyer_1" - } - ``` -3. **使用便捷发送方法**: - - ```python - await self.send_text("文本") # 自动处理群聊/私聊 - await self.send_emoji(emoji_base64) - ``` -4. **合理使用配置**: - - ```python - enable_feature = self.get_config("section.key", default_value) - ``` -5. **总是记录动作信息**: - - ```python - await self.store_action_info( - action_build_into_prompt=True, - action_prompt_display="动作描述", - action_done=True - ) - ``` - -通过使用新的API格式,Action的开发变得更加简洁和强大! diff --git a/docs/plugins/api/chat-api.md b/docs/plugins/api/chat-api.md new file mode 100644 index 000000000..496a58623 --- /dev/null +++ b/docs/plugins/api/chat-api.md @@ -0,0 +1,151 @@ +# 聊天API + +聊天API模块专门负责聊天信息的查询和管理,帮助插件获取和管理不同的聊天流。 + +## 导入方式 + +```python +from src.plugin_system.apis import chat_api +# 或者 +from src.plugin_system.apis.chat_api import ChatManager as chat +``` + +## 主要功能 + +### 1. 获取聊天流 + +#### `get_all_streams(platform: str = "qq") -> List[ChatStream]` +获取所有聊天流 + +**参数:** +- `platform`:平台筛选,默认为"qq" + +**返回:** +- `List[ChatStream]`:聊天流列表 + +**示例:** +```python +streams = chat_api.get_all_streams() +for stream in streams: + print(f"聊天流ID: {stream.stream_id}") +``` + +#### `get_group_streams(platform: str = "qq") -> List[ChatStream]` +获取所有群聊聊天流 + +**参数:** +- `platform`:平台筛选,默认为"qq" + +**返回:** +- `List[ChatStream]`:群聊聊天流列表 + +#### `get_private_streams(platform: str = "qq") -> List[ChatStream]` +获取所有私聊聊天流 + +**参数:** +- `platform`:平台筛选,默认为"qq" + +**返回:** +- `List[ChatStream]`:私聊聊天流列表 + +### 2. 查找特定聊天流 + +#### `get_stream_by_group_id(group_id: str, platform: str = "qq") -> Optional[ChatStream]` +根据群ID获取聊天流 + +**参数:** +- `group_id`:群聊ID +- `platform`:平台,默认为"qq" + +**返回:** +- `Optional[ChatStream]`:聊天流对象,如果未找到返回None + +**示例:** +```python +chat_stream = chat_api.get_stream_by_group_id("123456789") +if chat_stream: + print(f"找到群聊: {chat_stream.group_info.group_name}") +``` + +#### `get_stream_by_user_id(user_id: str, platform: str = "qq") -> Optional[ChatStream]` +根据用户ID获取私聊流 + +**参数:** +- `user_id`:用户ID +- `platform`:平台,默认为"qq" + +**返回:** +- `Optional[ChatStream]`:聊天流对象,如果未找到返回None + +### 3. 聊天流信息查询 + +#### `get_stream_type(chat_stream: ChatStream) -> str` +获取聊天流类型 + +**参数:** +- `chat_stream`:聊天流对象 + +**返回:** +- `str`:聊天类型 ("group", "private", "unknown") + +#### `get_stream_info(chat_stream: ChatStream) -> Dict[str, Any]` +获取聊天流详细信息 + +**参数:** +- `chat_stream`:聊天流对象 + +**返回:** +- `Dict[str, Any]`:聊天流信息字典,包含stream_id、platform、type等信息 + +**示例:** +```python +info = chat_api.get_stream_info(chat_stream) +print(f"聊天类型: {info['type']}") +print(f"平台: {info['platform']}") +if info['type'] == 'group': + print(f"群ID: {info['group_id']}") + print(f"群名: {info['group_name']}") +``` + +#### `get_streams_summary() -> Dict[str, int]` +获取聊天流统计信息 + +**返回:** +- `Dict[str, int]`:包含各平台群聊和私聊数量的统计字典 + +## 使用示例 + +### 基础用法 +```python +from src.plugin_system.apis import chat_api + +# 获取所有群聊 +group_streams = chat_api.get_group_streams() +print(f"共有 {len(group_streams)} 个群聊") + +# 查找特定群聊 +target_group = chat_api.get_stream_by_group_id("123456789") +if target_group: + group_info = chat_api.get_stream_info(target_group) + print(f"群名: {group_info['group_name']}") +``` + +### 遍历所有聊天流 +```python +# 获取所有聊天流并分类处理 +all_streams = chat_api.get_all_streams() + +for stream in all_streams: + stream_type = chat_api.get_stream_type(stream) + if stream_type == "group": + print(f"群聊: {stream.group_info.group_name}") + elif stream_type == "private": + print(f"私聊: {stream.user_info.user_nickname}") +``` + +## 注意事项 + +1. 所有函数都有错误处理,失败时会记录日志 +2. 查询函数返回None或空列表时表示未找到结果 +3. `platform`参数通常为"qq",也可能支持其他平台 +4. `ChatStream`对象包含了聊天的完整信息,包括用户信息、群信息等 \ No newline at end of file diff --git a/docs/plugins/api/config-api.md b/docs/plugins/api/config-api.md new file mode 100644 index 000000000..e61bb6962 --- /dev/null +++ b/docs/plugins/api/config-api.md @@ -0,0 +1,183 @@ +# 配置API + +配置API模块提供了配置读取和用户信息获取等功能,让插件能够安全地访问全局配置和用户信息。 + +## 导入方式 + +```python +from src.plugin_system.apis import config_api +``` + +## 主要功能 + +### 1. 配置访问 + +#### `get_global_config(key: str, default: Any = None) -> Any` +安全地从全局配置中获取一个值 + +**参数:** +- `key`:配置键名,支持嵌套访问如 "section.subsection.key" +- `default`:如果配置不存在时返回的默认值 + +**返回:** +- `Any`:配置值或默认值 + +**示例:** +```python +# 获取机器人昵称 +bot_name = config_api.get_global_config("bot.nickname", "MaiBot") + +# 获取嵌套配置 +llm_model = config_api.get_global_config("model.default.model_name", "gpt-3.5-turbo") + +# 获取不存在的配置 +unknown_config = config_api.get_global_config("unknown.config", "默认值") +``` + +#### `get_plugin_config(plugin_config: dict, key: str, default: Any = None) -> Any` +从插件配置中获取值,支持嵌套键访问 + +**参数:** +- `plugin_config`:插件配置字典 +- `key`:配置键名,支持嵌套访问如 "section.subsection.key" +- `default`:如果配置不存在时返回的默认值 + +**返回:** +- `Any`:配置值或默认值 + +**示例:** +```python +# 在插件中使用 +class MyPlugin(BasePlugin): + async def handle_action(self, action_data, chat_stream): + # 获取插件配置 + api_key = config_api.get_plugin_config(self.config, "api.key", "") + timeout = config_api.get_plugin_config(self.config, "timeout", 30) + + if not api_key: + logger.warning("API密钥未配置") + return False +``` + +### 2. 用户信息API + +#### `get_user_id_by_person_name(person_name: str) -> tuple[str, str]` +根据用户名获取用户ID + +**参数:** +- `person_name`:用户名 + +**返回:** +- `tuple[str, str]`:(平台, 用户ID) + +**示例:** +```python +platform, user_id = await config_api.get_user_id_by_person_name("张三") +if platform and user_id: + print(f"用户张三在{platform}平台的ID是{user_id}") +``` + +#### `get_person_info(person_id: str, key: str, default: Any = None) -> Any` +获取用户信息 + +**参数:** +- `person_id`:用户ID +- `key`:信息键名 +- `default`:默认值 + +**返回:** +- `Any`:用户信息值或默认值 + +**示例:** +```python +# 获取用户昵称 +nickname = await config_api.get_person_info(person_id, "nickname", "未知用户") + +# 获取用户印象 +impression = await config_api.get_person_info(person_id, "impression", "") +``` + +## 使用示例 + +### 配置驱动的插件开发 +```python +from src.plugin_system.apis import config_api +from src.plugin_system.base import BasePlugin + +class WeatherPlugin(BasePlugin): + async def handle_action(self, action_data, chat_stream): + # 从全局配置获取API配置 + api_endpoint = config_api.get_global_config("weather.api_endpoint", "") + default_city = config_api.get_global_config("weather.default_city", "北京") + + # 从插件配置获取特定设置 + api_key = config_api.get_plugin_config(self.config, "api_key", "") + timeout = config_api.get_plugin_config(self.config, "timeout", 10) + + if not api_key: + return {"success": False, "message": "Weather API密钥未配置"} + + # 使用配置进行天气查询... + return {"success": True, "message": f"{default_city}今天天气晴朗"} +``` + +### 用户信息查询 +```python +async def get_user_by_name(user_name: str): + """根据用户名获取完整的用户信息""" + + # 获取用户的平台和ID + platform, user_id = await config_api.get_user_id_by_person_name(user_name) + + if not platform or not user_id: + return None + + # 构建person_id + from src.person_info.person_info import PersonInfoManager + person_id = PersonInfoManager.get_person_id(platform, user_id) + + # 获取用户详细信息 + nickname = await config_api.get_person_info(person_id, "nickname", user_name) + impression = await config_api.get_person_info(person_id, "impression", "") + + return { + "platform": platform, + "user_id": user_id, + "nickname": nickname, + "impression": impression + } +``` + +## 配置键名说明 + +### 常用全局配置键 +- `bot.nickname`:机器人昵称 +- `bot.qq_account`:机器人QQ号 +- `model.default`:默认LLM模型配置 +- `database.path`:数据库路径 + +### 嵌套配置访问 +配置支持点号分隔的嵌套访问: +```python +# config.toml 中的配置: +# [bot] +# nickname = "MaiBot" +# qq_account = "123456" +# +# [model.default] +# model_name = "gpt-3.5-turbo" +# temperature = 0.7 + +# API调用: +bot_name = config_api.get_global_config("bot.nickname") +model_name = config_api.get_global_config("model.default.model_name") +temperature = config_api.get_global_config("model.default.temperature") +``` + +## 注意事项 + +1. **只读访问**:配置API只提供读取功能,插件不能修改全局配置 +2. **异步函数**:用户信息相关的函数是异步的,需要使用`await` +3. **错误处理**:所有函数都有错误处理,失败时会记录日志并返回默认值 +4. **安全性**:插件通过此API访问配置是安全和隔离的 +5. **性能**:频繁访问的配置建议在插件初始化时获取并缓存 \ No newline at end of file diff --git a/docs/plugins/api/database-api.md b/docs/plugins/api/database-api.md new file mode 100644 index 000000000..174bef158 --- /dev/null +++ b/docs/plugins/api/database-api.md @@ -0,0 +1,258 @@ +# 数据库API + +数据库API模块提供通用的数据库操作功能,支持查询、创建、更新和删除记录,采用Peewee ORM模型。 + +## 导入方式 + +```python +from src.plugin_system.apis import database_api +``` + +## 主要功能 + +### 1. 通用数据库查询 + +#### `db_query(model_class, query_type="get", filters=None, data=None, limit=None, order_by=None, single_result=False)` +执行数据库查询操作的通用接口 + +**参数:** +- `model_class`:Peewee模型类,如ActionRecords、Messages等 +- `query_type`:查询类型,可选值: "get", "create", "update", "delete", "count" +- `filters`:过滤条件字典,键为字段名,值为要匹配的值 +- `data`:用于创建或更新的数据字典 +- `limit`:限制结果数量 +- `order_by`:排序字段列表,使用字段名,前缀'-'表示降序 +- `single_result`:是否只返回单个结果 + +**返回:** +根据查询类型返回不同的结果: +- "get":返回查询结果列表或单个结果 +- "create":返回创建的记录 +- "update":返回受影响的行数 +- "delete":返回受影响的行数 +- "count":返回记录数量 + +### 2. 便捷查询函数 + +#### `db_save(model_class, data, key_field=None, key_value=None)` +保存数据到数据库(创建或更新) + +**参数:** +- `model_class`:Peewee模型类 +- `data`:要保存的数据字典 +- `key_field`:用于查找现有记录的字段名 +- `key_value`:用于查找现有记录的字段值 + +**返回:** +- `Dict[str, Any]`:保存后的记录数据,失败时返回None + +#### `db_get(model_class, filters=None, order_by=None, limit=None)` +简化的查询函数 + +**参数:** +- `model_class`:Peewee模型类 +- `filters`:过滤条件字典 +- `order_by`:排序字段 +- `limit`:限制结果数量 + +**返回:** +- `Union[List[Dict], Dict, None]`:查询结果 + +### 3. 专用函数 + +#### `store_action_info(...)` +存储动作信息的专用函数 + +## 使用示例 + +### 1. 基本查询操作 + +```python +from src.plugin_system.apis import database_api +from src.common.database.database_model import Messages, ActionRecords + +# 查询最近10条消息 +messages = await database_api.db_query( + Messages, + query_type="get", + filters={"chat_id": chat_stream.stream_id}, + limit=10, + order_by=["-time"] +) + +# 查询单条记录 +message = await database_api.db_query( + Messages, + query_type="get", + filters={"message_id": "msg_123"}, + single_result=True +) +``` + +### 2. 创建记录 + +```python +# 创建新的动作记录 +new_record = await database_api.db_query( + ActionRecords, + query_type="create", + data={ + "action_id": "action_123", + "time": time.time(), + "action_name": "TestAction", + "action_done": True + } +) + +print(f"创建了记录: {new_record['id']}") +``` + +### 3. 更新记录 + +```python +# 更新动作状态 +updated_count = await database_api.db_query( + ActionRecords, + query_type="update", + filters={"action_id": "action_123"}, + data={"action_done": True, "completion_time": time.time()} +) + +print(f"更新了 {updated_count} 条记录") +``` + +### 4. 删除记录 + +```python +# 删除过期记录 +deleted_count = await database_api.db_query( + ActionRecords, + query_type="delete", + filters={"time__lt": time.time() - 86400} # 删除24小时前的记录 +) + +print(f"删除了 {deleted_count} 条过期记录") +``` + +### 5. 统计查询 + +```python +# 统计消息数量 +message_count = await database_api.db_query( + Messages, + query_type="count", + filters={"chat_id": chat_stream.stream_id} +) + +print(f"该聊天有 {message_count} 条消息") +``` + +### 6. 使用便捷函数 + +```python +# 使用db_save进行创建或更新 +record = await database_api.db_save( + ActionRecords, + { + "action_id": "action_123", + "time": time.time(), + "action_name": "TestAction", + "action_done": True + }, + key_field="action_id", + key_value="action_123" +) + +# 使用db_get进行简单查询 +recent_messages = await database_api.db_get( + Messages, + filters={"chat_id": chat_stream.stream_id}, + order_by="-time", + limit=5 +) +``` + +## 高级用法 + +### 复杂查询示例 + +```python +# 查询特定用户在特定时间段的消息 +user_messages = await database_api.db_query( + Messages, + query_type="get", + filters={ + "user_id": "123456", + "time__gte": start_time, # 大于等于开始时间 + "time__lt": end_time # 小于结束时间 + }, + order_by=["-time"], + limit=50 +) + +# 批量处理 +for message in user_messages: + print(f"消息内容: {message['plain_text']}") + print(f"发送时间: {message['time']}") +``` + +### 插件中的数据持久化 + +```python +from src.plugin_system.base import BasePlugin +from src.plugin_system.apis import database_api + +class DataPlugin(BasePlugin): + async def handle_action(self, action_data, chat_stream): + # 保存插件数据 + plugin_data = { + "plugin_name": self.plugin_name, + "chat_id": chat_stream.stream_id, + "data": json.dumps(action_data), + "created_time": time.time() + } + + # 使用自定义表模型(需要先定义) + record = await database_api.db_save( + PluginData, # 假设的插件数据模型 + plugin_data, + key_field="plugin_name", + key_value=self.plugin_name + ) + + return {"success": True, "record_id": record["id"]} +``` + +## 数据模型 + +### 常用模型类 +系统提供了以下常用的数据模型: + +- `Messages`:消息记录 +- `ActionRecords`:动作记录 +- `UserInfo`:用户信息 +- `GroupInfo`:群组信息 + +### 字段说明 + +#### Messages模型主要字段 +- `message_id`:消息ID +- `chat_id`:聊天ID +- `user_id`:用户ID +- `plain_text`:纯文本内容 +- `time`:时间戳 + +#### ActionRecords模型主要字段 +- `action_id`:动作ID +- `action_name`:动作名称 +- `action_done`:是否完成 +- `time`:创建时间 + +## 注意事项 + +1. **异步操作**:所有数据库API都是异步的,必须使用`await` +2. **错误处理**:函数内置错误处理,失败时返回None或空列表 +3. **数据类型**:返回的都是字典格式的数据,不是模型对象 +4. **性能考虑**:使用`limit`参数避免查询大量数据 +5. **过滤条件**:支持简单的等值过滤,复杂查询需要使用原生Peewee语法 +6. **事务**:如需事务支持,建议直接使用Peewee的事务功能 \ No newline at end of file diff --git a/docs/plugins/api/emoji-api.md b/docs/plugins/api/emoji-api.md new file mode 100644 index 000000000..3346db9f9 --- /dev/null +++ b/docs/plugins/api/emoji-api.md @@ -0,0 +1,253 @@ +# 表情包API + +表情包API模块提供表情包的获取、查询和管理功能,让插件能够智能地选择和使用表情包。 + +## 导入方式 + +```python +from src.plugin_system.apis import emoji_api +``` + +## 主要功能 + +### 1. 表情包获取 + +#### `get_by_description(description: str) -> Optional[Tuple[str, str, str]]` +根据场景描述选择表情包 + +**参数:** +- `description`:场景描述文本,例如"开心的大笑"、"轻微的讽刺"、"表示无奈和沮丧"等 + +**返回:** +- `Optional[Tuple[str, str, str]]`:(base64编码, 表情包描述, 匹配的场景) 或 None + +**示例:** +```python +emoji_result = await emoji_api.get_by_description("开心的大笑") +if emoji_result: + emoji_base64, description, matched_scene = emoji_result + print(f"获取到表情包: {description}, 场景: {matched_scene}") + # 可以将emoji_base64用于发送表情包 +``` + +#### `get_random() -> Optional[Tuple[str, str, str]]` +随机获取表情包 + +**返回:** +- `Optional[Tuple[str, str, str]]`:(base64编码, 表情包描述, 随机场景) 或 None + +**示例:** +```python +random_emoji = await emoji_api.get_random() +if random_emoji: + emoji_base64, description, scene = random_emoji + print(f"随机表情包: {description}") +``` + +#### `get_by_emotion(emotion: str) -> Optional[Tuple[str, str, str]]` +根据场景关键词获取表情包 + +**参数:** +- `emotion`:场景关键词,如"大笑"、"讽刺"、"无奈"等 + +**返回:** +- `Optional[Tuple[str, str, str]]`:(base64编码, 表情包描述, 匹配的场景) 或 None + +**示例:** +```python +emoji_result = await emoji_api.get_by_emotion("讽刺") +if emoji_result: + emoji_base64, description, scene = emoji_result + # 发送讽刺表情包 +``` + +### 2. 表情包信息查询 + +#### `get_count() -> int` +获取表情包数量 + +**返回:** +- `int`:当前可用的表情包数量 + +#### `get_info() -> dict` +获取表情包系统信息 + +**返回:** +- `dict`:包含表情包数量、最大数量等信息 + +**返回字典包含:** +- `current_count`:当前表情包数量 +- `max_count`:最大表情包数量 +- `available_emojis`:可用表情包数量 + +#### `get_emotions() -> list` +获取所有可用的场景关键词 + +**返回:** +- `list`:所有表情包的场景关键词列表(去重) + +#### `get_descriptions() -> list` +获取所有表情包的描述列表 + +**返回:** +- `list`:所有表情包的描述文本列表 + +## 使用示例 + +### 1. 智能表情包选择 + +```python +from src.plugin_system.apis import emoji_api + +async def send_emotion_response(message_text: str, chat_stream): + """根据消息内容智能选择表情包回复""" + + # 分析消息场景 + if "哈哈" in message_text or "好笑" in message_text: + emoji_result = await emoji_api.get_by_description("开心的大笑") + elif "无语" in message_text or "算了" in message_text: + emoji_result = await emoji_api.get_by_description("表示无奈和沮丧") + elif "呵呵" in message_text or "是吗" in message_text: + emoji_result = await emoji_api.get_by_description("轻微的讽刺") + elif "生气" in message_text or "愤怒" in message_text: + emoji_result = await emoji_api.get_by_description("愤怒和不满") + else: + # 随机选择一个表情包 + emoji_result = await emoji_api.get_random() + + if emoji_result: + emoji_base64, description, scene = emoji_result + # 使用send_api发送表情包 + from src.plugin_system.apis import send_api + success = await send_api.emoji_to_group(emoji_base64, chat_stream.group_info.group_id) + return success + + return False +``` + +### 2. 表情包管理功能 + +```python +async def show_emoji_stats(): + """显示表情包统计信息""" + + # 获取基本信息 + count = emoji_api.get_count() + info = emoji_api.get_info() + scenes = emoji_api.get_emotions() # 实际返回的是场景关键词 + + stats = f""" +📊 表情包统计信息: +- 总数量: {count} +- 可用数量: {info['available_emojis']} +- 最大容量: {info['max_count']} +- 支持场景: {len(scenes)}种 + +🎭 支持的场景关键词: {', '.join(scenes[:10])}{'...' if len(scenes) > 10 else ''} + """ + + return stats +``` + +### 3. 表情包测试功能 + +```python +async def test_emoji_system(): + """测试表情包系统的各种功能""" + + print("=== 表情包系统测试 ===") + + # 测试场景描述查找 + test_descriptions = ["开心的大笑", "轻微的讽刺", "表示无奈和沮丧", "愤怒和不满"] + for desc in test_descriptions: + result = await emoji_api.get_by_description(desc) + if result: + _, description, scene = result + print(f"✅ 场景'{desc}' -> {description} ({scene})") + else: + print(f"❌ 场景'{desc}' -> 未找到") + + # 测试关键词查找 + scenes = emoji_api.get_emotions() + if scenes: + test_scene = scenes[0] + result = await emoji_api.get_by_emotion(test_scene) + if result: + print(f"✅ 关键词'{test_scene}' -> 找到匹配表情包") + + # 测试随机获取 + random_result = await emoji_api.get_random() + if random_result: + print("✅ 随机获取 -> 成功") + + print(f"📊 系统信息: {emoji_api.get_info()}") +``` + +### 4. 在Action中使用表情包 + +```python +from src.plugin_system.base import BaseAction + +class EmojiAction(BaseAction): + async def execute(self, action_data, chat_stream): + # 从action_data获取场景描述或关键词 + scene_keyword = action_data.get("scene", "") + scene_description = action_data.get("description", "") + + emoji_result = None + + # 优先使用具体的场景描述 + if scene_description: + emoji_result = await emoji_api.get_by_description(scene_description) + # 其次使用场景关键词 + elif scene_keyword: + emoji_result = await emoji_api.get_by_emotion(scene_keyword) + # 最后随机选择 + else: + emoji_result = await emoji_api.get_random() + + if emoji_result: + emoji_base64, description, scene = emoji_result + return { + "success": True, + "emoji_base64": emoji_base64, + "description": description, + "scene": scene + } + + return {"success": False, "message": "未找到合适的表情包"} +``` + +## 场景描述说明 + +### 常用场景描述 +表情包系统支持多种具体的场景描述,常见的包括: + +- **开心类场景**:开心的大笑、满意的微笑、兴奋的手舞足蹈 +- **无奈类场景**:表示无奈和沮丧、轻微的讽刺、无语的摇头 +- **愤怒类场景**:愤怒和不满、生气的瞪视、暴躁的抓狂 +- **惊讶类场景**:震惊的表情、意外的发现、困惑的思考 +- **可爱类场景**:卖萌的表情、撒娇的动作、害羞的样子 + +### 场景关键词示例 +系统支持的场景关键词包括: +- 大笑、微笑、兴奋、手舞足蹈 +- 无奈、沮丧、讽刺、无语、摇头 +- 愤怒、不满、生气、瞪视、抓狂 +- 震惊、意外、困惑、思考 +- 卖萌、撒娇、害羞、可爱 + +### 匹配机制 +- **精确匹配**:优先匹配完整的场景描述,如"开心的大笑" +- **关键词匹配**:如果没有精确匹配,则根据关键词进行模糊匹配 +- **语义匹配**:系统会理解场景的语义含义进行智能匹配 + +## 注意事项 + +1. **异步函数**:获取表情包的函数都是异步的,需要使用 `await` +2. **返回格式**:表情包以base64编码返回,可直接用于发送 +3. **错误处理**:所有函数都有错误处理,失败时返回None或默认值 +4. **使用统计**:系统会记录表情包的使用次数 +5. **文件依赖**:表情包依赖于本地文件,确保表情包文件存在 +6. **编码格式**:返回的是base64编码的图片数据,可直接用于网络传输 +7. **场景理解**:系统能理解具体的场景描述,比简单的情感分类更准确 diff --git a/docs/plugins/api/generator-api.md b/docs/plugins/api/generator-api.md new file mode 100644 index 000000000..964fff84a --- /dev/null +++ b/docs/plugins/api/generator-api.md @@ -0,0 +1,341 @@ +# 回复生成器API + +回复生成器API模块提供智能回复生成功能,让插件能够使用系统的回复生成器来产生自然的聊天回复。 + +## 导入方式 + +```python +from src.plugin_system.apis import generator_api +``` + +## 主要功能 + +### 1. 回复器获取 + +#### `get_replyer(chat_stream=None, platform=None, chat_id=None, is_group=True)` +获取回复器对象 + +**参数:** +- `chat_stream`:聊天流对象(优先) +- `platform`:平台名称,如"qq" +- `chat_id`:聊天ID(群ID或用户ID) +- `is_group`:是否为群聊 + +**返回:** +- `DefaultReplyer`:回复器对象,如果获取失败则返回None + +**示例:** +```python +# 使用聊天流获取回复器 +replyer = generator_api.get_replyer(chat_stream=chat_stream) + +# 使用平台和ID获取回复器 +replyer = generator_api.get_replyer( + platform="qq", + chat_id="123456789", + is_group=True +) +``` + +### 2. 回复生成 + +#### `generate_reply(chat_stream=None, action_data=None, platform=None, chat_id=None, is_group=True)` +生成回复 + +**参数:** +- `chat_stream`:聊天流对象(优先) +- `action_data`:动作数据 +- `platform`:平台名称(备用) +- `chat_id`:聊天ID(备用) +- `is_group`:是否为群聊(备用) + +**返回:** +- `Tuple[bool, List[Tuple[str, Any]]]`:(是否成功, 回复集合) + +**示例:** +```python +success, reply_set = await generator_api.generate_reply( + chat_stream=chat_stream, + action_data={"message": "你好", "intent": "greeting"} +) + +if success: + for reply_type, reply_content in reply_set: + print(f"回复类型: {reply_type}, 内容: {reply_content}") +``` + +#### `rewrite_reply(chat_stream=None, reply_data=None, platform=None, chat_id=None, is_group=True)` +重写回复 + +**参数:** +- `chat_stream`:聊天流对象(优先) +- `reply_data`:回复数据 +- `platform`:平台名称(备用) +- `chat_id`:聊天ID(备用) +- `is_group`:是否为群聊(备用) + +**返回:** +- `Tuple[bool, List[Tuple[str, Any]]]`:(是否成功, 回复集合) + +**示例:** +```python +success, reply_set = await generator_api.rewrite_reply( + chat_stream=chat_stream, + reply_data={"original_text": "原始回复", "style": "more_friendly"} +) +``` + +## 使用示例 + +### 1. 基础回复生成 + +```python +from src.plugin_system.apis import generator_api + +async def generate_greeting_reply(chat_stream, user_name): + """生成问候回复""" + + action_data = { + "intent": "greeting", + "user_name": user_name, + "context": "morning_greeting" + } + + success, reply_set = await generator_api.generate_reply( + chat_stream=chat_stream, + action_data=action_data + ) + + if success and reply_set: + # 获取第一个回复 + reply_type, reply_content = reply_set[0] + return reply_content + + return "你好!" # 默认回复 +``` + +### 2. 在Action中使用回复生成器 + +```python +from src.plugin_system.base import BaseAction + +class ChatAction(BaseAction): + async def execute(self, action_data, chat_stream): + # 准备回复数据 + reply_context = { + "message_type": "response", + "user_input": action_data.get("user_message", ""), + "intent": action_data.get("intent", ""), + "entities": action_data.get("entities", {}), + "context": self.get_conversation_context(chat_stream) + } + + # 生成回复 + success, reply_set = await generator_api.generate_reply( + chat_stream=chat_stream, + action_data=reply_context + ) + + if success: + return { + "success": True, + "replies": reply_set, + "generated_count": len(reply_set) + } + + return { + "success": False, + "error": "回复生成失败", + "fallback_reply": "抱歉,我现在无法理解您的消息。" + } +``` + +### 3. 多样化回复生成 + +```python +async def generate_diverse_replies(chat_stream, topic, count=3): + """生成多个不同风格的回复""" + + styles = ["formal", "casual", "humorous"] + all_replies = [] + + for i, style in enumerate(styles[:count]): + action_data = { + "topic": topic, + "style": style, + "variation": i + } + + success, reply_set = await generator_api.generate_reply( + chat_stream=chat_stream, + action_data=action_data + ) + + if success and reply_set: + all_replies.extend(reply_set) + + return all_replies +``` + +### 4. 回复重写功能 + +```python +async def improve_reply(chat_stream, original_reply, improvement_type="more_friendly"): + """改进原始回复""" + + reply_data = { + "original_text": original_reply, + "improvement_type": improvement_type, + "target_audience": "young_users", + "tone": "positive" + } + + success, improved_replies = await generator_api.rewrite_reply( + chat_stream=chat_stream, + reply_data=reply_data + ) + + if success and improved_replies: + # 返回改进后的第一个回复 + _, improved_content = improved_replies[0] + return improved_content + + return original_reply # 如果改进失败,返回原始回复 +``` + +### 5. 条件回复生成 + +```python +async def conditional_reply_generation(chat_stream, user_message, user_emotion): + """根据用户情感生成条件回复""" + + # 根据情感调整回复策略 + if user_emotion == "sad": + action_data = { + "intent": "comfort", + "tone": "empathetic", + "style": "supportive" + } + elif user_emotion == "angry": + action_data = { + "intent": "calm", + "tone": "peaceful", + "style": "understanding" + } + else: + action_data = { + "intent": "respond", + "tone": "neutral", + "style": "helpful" + } + + action_data["user_message"] = user_message + action_data["user_emotion"] = user_emotion + + success, reply_set = await generator_api.generate_reply( + chat_stream=chat_stream, + action_data=action_data + ) + + return reply_set if success else [] +``` + +## 回复集合格式 + +### 回复类型 +生成的回复集合包含多种类型的回复: + +- `"text"`:纯文本回复 +- `"emoji"`:表情包回复 +- `"image"`:图片回复 +- `"mixed"`:混合类型回复 + +### 回复集合结构 +```python +# 示例回复集合 +reply_set = [ + ("text", "很高兴见到你!"), + ("emoji", "emoji_base64_data"), + ("text", "有什么可以帮助你的吗?") +] +``` + +## 高级用法 + +### 1. 自定义回复器配置 + +```python +async def generate_with_custom_config(chat_stream, action_data): + """使用自定义配置生成回复""" + + # 获取回复器 + replyer = generator_api.get_replyer(chat_stream=chat_stream) + + if replyer: + # 可以访问回复器的内部方法 + success, reply_set = await replyer.generate_reply_with_context( + reply_data=action_data, + # 可以传递额外的配置参数 + ) + return success, reply_set + + return False, [] +``` + +### 2. 回复质量评估 + +```python +async def generate_and_evaluate_replies(chat_stream, action_data): + """生成回复并评估质量""" + + success, reply_set = await generator_api.generate_reply( + chat_stream=chat_stream, + action_data=action_data + ) + + if success: + evaluated_replies = [] + for reply_type, reply_content in reply_set: + # 简单的质量评估 + quality_score = evaluate_reply_quality(reply_content) + evaluated_replies.append({ + "type": reply_type, + "content": reply_content, + "quality": quality_score + }) + + # 按质量排序 + evaluated_replies.sort(key=lambda x: x["quality"], reverse=True) + return evaluated_replies + + return [] + +def evaluate_reply_quality(reply_content): + """简单的回复质量评估""" + if not reply_content: + return 0 + + score = 50 # 基础分 + + # 长度适中加分 + if 5 <= len(reply_content) <= 100: + score += 20 + + # 包含积极词汇加分 + positive_words = ["好", "棒", "不错", "感谢", "开心"] + for word in positive_words: + if word in reply_content: + score += 10 + break + + return min(score, 100) +``` + +## 注意事项 + +1. **异步操作**:所有生成函数都是异步的,必须使用`await` +2. **错误处理**:函数内置错误处理,失败时返回False和空列表 +3. **聊天流依赖**:需要有效的聊天流对象才能正常工作 +4. **性能考虑**:回复生成可能需要一些时间,特别是使用LLM时 +5. **回复格式**:返回的回复集合是元组列表,包含类型和内容 +6. **上下文感知**:生成器会考虑聊天上下文和历史消息 \ No newline at end of file diff --git a/docs/plugins/api/llm-api.md b/docs/plugins/api/llm-api.md new file mode 100644 index 000000000..e0879ddfc --- /dev/null +++ b/docs/plugins/api/llm-api.md @@ -0,0 +1,244 @@ +# LLM API + +LLM API模块提供与大语言模型交互的功能,让插件能够使用系统配置的LLM模型进行内容生成。 + +## 导入方式 + +```python +from src.plugin_system.apis import llm_api +``` + +## 主要功能 + +### 1. 模型管理 + +#### `get_available_models() -> Dict[str, Any]` +获取所有可用的模型配置 + +**返回:** +- `Dict[str, Any]`:模型配置字典,key为模型名称,value为模型配置 + +**示例:** +```python +models = llm_api.get_available_models() +for model_name, model_config in models.items(): + print(f"模型: {model_name}") + print(f"配置: {model_config}") +``` + +### 2. 内容生成 + +#### `generate_with_model(prompt, model_config, request_type="plugin.generate", **kwargs)` +使用指定模型生成内容 + +**参数:** +- `prompt`:提示词 +- `model_config`:模型配置(从 get_available_models 获取) +- `request_type`:请求类型标识 +- `**kwargs`:其他模型特定参数,如temperature、max_tokens等 + +**返回:** +- `Tuple[bool, str, str, str]`:(是否成功, 生成的内容, 推理过程, 模型名称) + +**示例:** +```python +models = llm_api.get_available_models() +default_model = models.get("default") + +if default_model: + success, response, reasoning, model_name = await llm_api.generate_with_model( + prompt="请写一首关于春天的诗", + model_config=default_model, + temperature=0.7, + max_tokens=200 + ) + + if success: + print(f"生成内容: {response}") + print(f"使用模型: {model_name}") +``` + +## 使用示例 + +### 1. 基础文本生成 + +```python +from src.plugin_system.apis import llm_api + +async def generate_story(topic: str): + """生成故事""" + models = llm_api.get_available_models() + model = models.get("default") + + if not model: + return "未找到可用模型" + + prompt = f"请写一个关于{topic}的短故事,大约100字左右。" + + success, story, reasoning, model_name = await llm_api.generate_with_model( + prompt=prompt, + model_config=model, + request_type="story.generate", + temperature=0.8, + max_tokens=150 + ) + + return story if success else "故事生成失败" +``` + +### 2. 在Action中使用LLM + +```python +from src.plugin_system.base import BaseAction + +class LLMAction(BaseAction): + async def execute(self, action_data, chat_stream): + # 获取用户输入 + user_input = action_data.get("user_message", "") + intent = action_data.get("intent", "chat") + + # 获取模型配置 + models = llm_api.get_available_models() + model = models.get("default") + + if not model: + return {"success": False, "error": "未配置LLM模型"} + + # 构建提示词 + prompt = self.build_prompt(user_input, intent) + + # 生成回复 + success, response, reasoning, model_name = await llm_api.generate_with_model( + prompt=prompt, + model_config=model, + request_type=f"plugin.{self.plugin_name}", + temperature=0.7 + ) + + if success: + return { + "success": True, + "response": response, + "model_used": model_name, + "reasoning": reasoning + } + + return {"success": False, "error": response} + + def build_prompt(self, user_input: str, intent: str) -> str: + """构建提示词""" + base_prompt = "你是一个友善的AI助手。" + + if intent == "question": + return f"{base_prompt}\n\n用户问题:{user_input}\n\n请提供准确、有用的回答:" + elif intent == "chat": + return f"{base_prompt}\n\n用户说:{user_input}\n\n请进行自然的对话:" + else: + return f"{base_prompt}\n\n用户输入:{user_input}\n\n请回复:" +``` + +### 3. 多模型对比 + +```python +async def compare_models(prompt: str): + """使用多个模型生成内容并对比""" + models = llm_api.get_available_models() + results = {} + + for model_name, model_config in models.items(): + success, response, reasoning, actual_model = await llm_api.generate_with_model( + prompt=prompt, + model_config=model_config, + request_type="comparison.test" + ) + + results[model_name] = { + "success": success, + "response": response, + "model": actual_model, + "reasoning": reasoning + } + + return results +``` + +### 4. 智能对话插件 + +```python +class ChatbotPlugin(BasePlugin): + async def handle_action(self, action_data, chat_stream): + user_message = action_data.get("message", "") + + # 获取历史对话上下文 + context = self.get_conversation_context(chat_stream) + + # 构建对话提示词 + prompt = self.build_conversation_prompt(user_message, context) + + # 获取模型配置 + models = llm_api.get_available_models() + chat_model = models.get("chat", models.get("default")) + + if not chat_model: + return {"success": False, "message": "聊天模型未配置"} + + # 生成回复 + success, response, reasoning, model_name = await llm_api.generate_with_model( + prompt=prompt, + model_config=chat_model, + request_type="chat.conversation", + temperature=0.8, + max_tokens=500 + ) + + if success: + # 保存对话历史 + self.save_conversation(chat_stream, user_message, response) + + return { + "success": True, + "reply": response, + "model": model_name + } + + return {"success": False, "message": "回复生成失败"} + + def build_conversation_prompt(self, user_message: str, context: list) -> str: + """构建对话提示词""" + prompt = "你是一个有趣、友善的聊天机器人。请自然地回复用户的消息。\n\n" + + # 添加历史对话 + if context: + prompt += "对话历史:\n" + for msg in context[-5:]: # 只保留最近5条 + prompt += f"用户: {msg['user']}\n机器人: {msg['bot']}\n" + prompt += "\n" + + prompt += f"用户: {user_message}\n机器人: " + return prompt +``` + +## 模型配置说明 + +### 常用模型类型 +- `default`:默认模型 +- `chat`:聊天专用模型 +- `creative`:创意生成模型 +- `code`:代码生成模型 + +### 配置参数 +LLM模型支持的常用参数: +- `temperature`:控制输出随机性(0.0-1.0) +- `max_tokens`:最大生成长度 +- `top_p`:核采样参数 +- `frequency_penalty`:频率惩罚 +- `presence_penalty`:存在惩罚 + +## 注意事项 + +1. **异步操作**:LLM生成是异步的,必须使用`await` +2. **错误处理**:生成失败时返回False和错误信息 +3. **配置依赖**:需要正确配置模型才能使用 +4. **请求类型**:建议为不同用途设置不同的request_type +5. **性能考虑**:LLM调用可能较慢,考虑超时和缓存 +6. **成本控制**:注意控制max_tokens以控制成本 \ No newline at end of file diff --git a/docs/plugins/api/person-api.md b/docs/plugins/api/person-api.md new file mode 100644 index 000000000..3e1bafaf7 --- /dev/null +++ b/docs/plugins/api/person-api.md @@ -0,0 +1,342 @@ +# 个人信息API + +个人信息API模块提供用户信息查询和管理功能,让插件能够获取和使用用户的相关信息。 + +## 导入方式 + +```python +from src.plugin_system.apis import person_api +``` + +## 主要功能 + +### 1. Person ID管理 + +#### `get_person_id(platform: str, user_id: int) -> str` +根据平台和用户ID获取person_id + +**参数:** +- `platform`:平台名称,如 "qq", "telegram" 等 +- `user_id`:用户ID + +**返回:** +- `str`:唯一的person_id(MD5哈希值) + +**示例:** +```python +person_id = person_api.get_person_id("qq", 123456) +print(f"Person ID: {person_id}") +``` + +### 2. 用户信息查询 + +#### `get_person_value(person_id: str, field_name: str, default: Any = None) -> Any` +根据person_id和字段名获取某个值 + +**参数:** +- `person_id`:用户的唯一标识ID +- `field_name`:要获取的字段名,如 "nickname", "impression" 等 +- `default`:当字段不存在或获取失败时返回的默认值 + +**返回:** +- `Any`:字段值或默认值 + +**示例:** +```python +nickname = await person_api.get_person_value(person_id, "nickname", "未知用户") +impression = await person_api.get_person_value(person_id, "impression") +``` + +#### `get_person_values(person_id: str, field_names: list, default_dict: dict = None) -> dict` +批量获取用户信息字段值 + +**参数:** +- `person_id`:用户的唯一标识ID +- `field_names`:要获取的字段名列表 +- `default_dict`:默认值字典,键为字段名,值为默认值 + +**返回:** +- `dict`:字段名到值的映射字典 + +**示例:** +```python +values = await person_api.get_person_values( + person_id, + ["nickname", "impression", "know_times"], + {"nickname": "未知用户", "know_times": 0} +) +``` + +### 3. 用户状态查询 + +#### `is_person_known(platform: str, user_id: int) -> bool` +判断是否认识某个用户 + +**参数:** +- `platform`:平台名称 +- `user_id`:用户ID + +**返回:** +- `bool`:是否认识该用户 + +**示例:** +```python +known = await person_api.is_person_known("qq", 123456) +if known: + print("这个用户我认识") +``` + +### 4. 用户名查询 + +#### `get_person_id_by_name(person_name: str) -> str` +根据用户名获取person_id + +**参数:** +- `person_name`:用户名 + +**返回:** +- `str`:person_id,如果未找到返回空字符串 + +**示例:** +```python +person_id = person_api.get_person_id_by_name("张三") +if person_id: + print(f"找到用户: {person_id}") +``` + +## 使用示例 + +### 1. 基础用户信息获取 + +```python +from src.plugin_system.apis import person_api + +async def get_user_info(platform: str, user_id: int): + """获取用户基本信息""" + + # 获取person_id + person_id = person_api.get_person_id(platform, user_id) + + # 获取用户信息 + user_info = await person_api.get_person_values( + person_id, + ["nickname", "impression", "know_times", "last_seen"], + { + "nickname": "未知用户", + "impression": "", + "know_times": 0, + "last_seen": 0 + } + ) + + return { + "person_id": person_id, + "nickname": user_info["nickname"], + "impression": user_info["impression"], + "know_times": user_info["know_times"], + "last_seen": user_info["last_seen"] + } +``` + +### 2. 在Action中使用用户信息 + +```python +from src.plugin_system.base import BaseAction + +class PersonalizedAction(BaseAction): + async def execute(self, action_data, chat_stream): + # 获取发送者信息 + user_id = chat_stream.user_info.user_id + platform = chat_stream.platform + + # 获取person_id + person_id = person_api.get_person_id(platform, user_id) + + # 获取用户昵称和印象 + nickname = await person_api.get_person_value(person_id, "nickname", "朋友") + impression = await person_api.get_person_value(person_id, "impression", "") + + # 根据用户信息个性化回复 + if impression: + response = f"你好 {nickname}!根据我对你的了解:{impression}" + else: + response = f"你好 {nickname}!很高兴见到你。" + + return { + "success": True, + "response": response, + "user_info": { + "nickname": nickname, + "impression": impression + } + } +``` + +### 3. 用户识别和欢迎 + +```python +async def welcome_user(chat_stream): + """欢迎用户,区分新老用户""" + + user_id = chat_stream.user_info.user_id + platform = chat_stream.platform + + # 检查是否认识这个用户 + is_known = await person_api.is_person_known(platform, user_id) + + if is_known: + # 老用户,获取详细信息 + person_id = person_api.get_person_id(platform, user_id) + nickname = await person_api.get_person_value(person_id, "nickname", "老朋友") + know_times = await person_api.get_person_value(person_id, "know_times", 0) + + welcome_msg = f"欢迎回来,{nickname}!我们已经聊过 {know_times} 次了。" + else: + # 新用户 + welcome_msg = "你好!很高兴认识你,我是MaiBot。" + + return welcome_msg +``` + +### 4. 用户搜索功能 + +```python +async def find_user_by_name(name: str): + """根据名字查找用户""" + + person_id = person_api.get_person_id_by_name(name) + + if not person_id: + return {"found": False, "message": f"未找到名为 '{name}' 的用户"} + + # 获取用户详细信息 + user_info = await person_api.get_person_values( + person_id, + ["nickname", "platform", "user_id", "impression", "know_times"], + {} + ) + + return { + "found": True, + "person_id": person_id, + "info": user_info + } +``` + +### 5. 用户印象分析 + +```python +async def analyze_user_relationship(chat_stream): + """分析用户关系""" + + user_id = chat_stream.user_info.user_id + platform = chat_stream.platform + person_id = person_api.get_person_id(platform, user_id) + + # 获取关系相关信息 + relationship_info = await person_api.get_person_values( + person_id, + ["nickname", "impression", "know_times", "relationship_level", "last_interaction"], + { + "nickname": "未知", + "impression": "", + "know_times": 0, + "relationship_level": "stranger", + "last_interaction": 0 + } + ) + + # 分析关系程度 + know_times = relationship_info["know_times"] + if know_times == 0: + relationship = "陌生人" + elif know_times < 5: + relationship = "新朋友" + elif know_times < 20: + relationship = "熟人" + else: + relationship = "老朋友" + + return { + "nickname": relationship_info["nickname"], + "relationship": relationship, + "impression": relationship_info["impression"], + "interaction_count": know_times + } +``` + +## 常用字段说明 + +### 基础信息字段 +- `nickname`:用户昵称 +- `platform`:平台信息 +- `user_id`:用户ID + +### 关系信息字段 +- `impression`:对用户的印象 +- `know_times`:交互次数 +- `relationship_level`:关系等级 +- `last_seen`:最后见面时间 +- `last_interaction`:最后交互时间 + +### 个性化字段 +- `preferences`:用户偏好 +- `interests`:兴趣爱好 +- `mood_history`:情绪历史 +- `topic_interests`:话题兴趣 + +## 最佳实践 + +### 1. 错误处理 +```python +async def safe_get_user_info(person_id: str, field: str): + """安全获取用户信息""" + try: + value = await person_api.get_person_value(person_id, field) + return value if value is not None else "未设置" + except Exception as e: + logger.error(f"获取用户信息失败: {e}") + return "获取失败" +``` + +### 2. 批量操作 +```python +async def get_complete_user_profile(person_id: str): + """获取完整用户档案""" + + # 一次性获取所有需要的字段 + fields = [ + "nickname", "impression", "know_times", + "preferences", "interests", "relationship_level" + ] + + defaults = { + "nickname": "用户", + "impression": "", + "know_times": 0, + "preferences": "{}", + "interests": "[]", + "relationship_level": "stranger" + } + + profile = await person_api.get_person_values(person_id, fields, defaults) + + # 处理JSON字段 + try: + profile["preferences"] = json.loads(profile["preferences"]) + profile["interests"] = json.loads(profile["interests"]) + except: + profile["preferences"] = {} + profile["interests"] = [] + + return profile +``` + +## 注意事项 + +1. **异步操作**:大部分查询函数都是异步的,需要使用`await` +2. **错误处理**:所有函数都有错误处理,失败时记录日志并返回默认值 +3. **数据类型**:返回的数据可能是字符串、数字或JSON,需要适当处理 +4. **性能考虑**:批量查询优于单个查询 +5. **隐私保护**:确保用户信息的使用符合隐私政策 +6. **数据一致性**:person_id是用户的唯一标识,应妥善保存和使用 \ No newline at end of file diff --git a/docs/plugins/api/send-api.md b/docs/plugins/api/send-api.md new file mode 100644 index 000000000..79335c61a --- /dev/null +++ b/docs/plugins/api/send-api.md @@ -0,0 +1,368 @@ +# 消息发送API + +消息发送API模块专门负责发送各种类型的消息,支持文本、表情包、图片等多种消息类型。 + +## 导入方式 + +```python +from src.plugin_system.apis import send_api +``` + +## 主要功能 + +### 1. 文本消息发送 + +#### `text_to_group(text, group_id, platform="qq", typing=False, reply_to="", storage_message=True)` +向群聊发送文本消息 + +**参数:** +- `text`:要发送的文本内容 +- `group_id`:群聊ID +- `platform`:平台,默认为"qq" +- `typing`:是否显示正在输入 +- `reply_to`:回复消息的格式,如"发送者:消息内容" +- `storage_message`:是否存储到数据库 + +**返回:** +- `bool`:是否发送成功 + +#### `text_to_user(text, user_id, platform="qq", typing=False, reply_to="", storage_message=True)` +向用户发送私聊文本消息 + +**参数与返回值同上** + +### 2. 表情包发送 + +#### `emoji_to_group(emoji_base64, group_id, platform="qq", storage_message=True)` +向群聊发送表情包 + +**参数:** +- `emoji_base64`:表情包的base64编码 +- `group_id`:群聊ID +- `platform`:平台,默认为"qq" +- `storage_message`:是否存储到数据库 + +#### `emoji_to_user(emoji_base64, user_id, platform="qq", storage_message=True)` +向用户发送表情包 + +### 3. 图片发送 + +#### `image_to_group(image_base64, group_id, platform="qq", storage_message=True)` +向群聊发送图片 + +#### `image_to_user(image_base64, user_id, platform="qq", storage_message=True)` +向用户发送图片 + +### 4. 命令发送 + +#### `command_to_group(command, group_id, platform="qq", storage_message=True)` +向群聊发送命令 + +#### `command_to_user(command, user_id, platform="qq", storage_message=True)` +向用户发送命令 + +### 5. 自定义消息发送 + +#### `custom_to_group(message_type, content, group_id, platform="qq", display_message="", typing=False, reply_to="", storage_message=True)` +向群聊发送自定义类型消息 + +#### `custom_to_user(message_type, content, user_id, platform="qq", display_message="", typing=False, reply_to="", storage_message=True)` +向用户发送自定义类型消息 + +#### `custom_message(message_type, content, target_id, is_group=True, platform="qq", display_message="", typing=False, reply_to="", storage_message=True)` +通用的自定义消息发送 + +**参数:** +- `message_type`:消息类型,如"text"、"image"、"emoji"等 +- `content`:消息内容 +- `target_id`:目标ID(群ID或用户ID) +- `is_group`:是否为群聊 +- `platform`:平台 +- `display_message`:显示消息 +- `typing`:是否显示正在输入 +- `reply_to`:回复消息 +- `storage_message`:是否存储 + +## 使用示例 + +### 1. 基础文本发送 + +```python +from src.plugin_system.apis import send_api + +async def send_hello(chat_stream): + """发送问候消息""" + + if chat_stream.group_info: + # 群聊 + success = await send_api.text_to_group( + text="大家好!", + group_id=chat_stream.group_info.group_id, + typing=True + ) + else: + # 私聊 + success = await send_api.text_to_user( + text="你好!", + user_id=chat_stream.user_info.user_id, + typing=True + ) + + return success +``` + +### 2. 回复特定消息 + +```python +async def reply_to_message(chat_stream, reply_text, original_sender, original_message): + """回复特定消息""" + + # 构建回复格式 + reply_to = f"{original_sender}:{original_message}" + + if chat_stream.group_info: + success = await send_api.text_to_group( + text=reply_text, + group_id=chat_stream.group_info.group_id, + reply_to=reply_to + ) + else: + success = await send_api.text_to_user( + text=reply_text, + user_id=chat_stream.user_info.user_id, + reply_to=reply_to + ) + + return success +``` + +### 3. 发送表情包 + +```python +async def send_emoji_reaction(chat_stream, emotion): + """根据情感发送表情包""" + + from src.plugin_system.apis import emoji_api + + # 获取表情包 + emoji_result = await emoji_api.get_by_emotion(emotion) + if not emoji_result: + return False + + emoji_base64, description, matched_emotion = emoji_result + + # 发送表情包 + if chat_stream.group_info: + success = await send_api.emoji_to_group( + emoji_base64=emoji_base64, + group_id=chat_stream.group_info.group_id + ) + else: + success = await send_api.emoji_to_user( + emoji_base64=emoji_base64, + user_id=chat_stream.user_info.user_id + ) + + return success +``` + +### 4. 在Action中发送消息 + +```python +from src.plugin_system.base import BaseAction + +class MessageAction(BaseAction): + async def execute(self, action_data, chat_stream): + message_type = action_data.get("type", "text") + content = action_data.get("content", "") + + if message_type == "text": + success = await self.send_text(chat_stream, content) + elif message_type == "emoji": + success = await self.send_emoji(chat_stream, content) + elif message_type == "image": + success = await self.send_image(chat_stream, content) + else: + success = False + + return {"success": success} + + async def send_text(self, chat_stream, text): + if chat_stream.group_info: + return await send_api.text_to_group(text, chat_stream.group_info.group_id) + else: + return await send_api.text_to_user(text, chat_stream.user_info.user_id) + + async def send_emoji(self, chat_stream, emoji_base64): + if chat_stream.group_info: + return await send_api.emoji_to_group(emoji_base64, chat_stream.group_info.group_id) + else: + return await send_api.emoji_to_user(emoji_base64, chat_stream.user_info.user_id) + + async def send_image(self, chat_stream, image_base64): + if chat_stream.group_info: + return await send_api.image_to_group(image_base64, chat_stream.group_info.group_id) + else: + return await send_api.image_to_user(image_base64, chat_stream.user_info.user_id) +``` + +### 5. 批量发送消息 + +```python +async def broadcast_message(message: str, target_groups: list): + """向多个群组广播消息""" + + results = {} + + for group_id in target_groups: + try: + success = await send_api.text_to_group( + text=message, + group_id=group_id, + typing=True + ) + results[group_id] = success + except Exception as e: + results[group_id] = False + print(f"发送到群 {group_id} 失败: {e}") + + return results +``` + +### 6. 智能消息发送 + +```python +async def smart_send(chat_stream, message_data): + """智能发送不同类型的消息""" + + message_type = message_data.get("type", "text") + content = message_data.get("content", "") + options = message_data.get("options", {}) + + # 根据聊天流类型选择发送方法 + target_id = (chat_stream.group_info.group_id if chat_stream.group_info + else chat_stream.user_info.user_id) + is_group = chat_stream.group_info is not None + + # 使用通用发送方法 + success = await send_api.custom_message( + message_type=message_type, + content=content, + target_id=target_id, + is_group=is_group, + typing=options.get("typing", False), + reply_to=options.get("reply_to", ""), + display_message=options.get("display_message", "") + ) + + return success +``` + +## 消息类型说明 + +### 支持的消息类型 +- `"text"`:纯文本消息 +- `"emoji"`:表情包消息 +- `"image"`:图片消息 +- `"command"`:命令消息 +- `"video"`:视频消息(如果支持) +- `"audio"`:音频消息(如果支持) + +### 回复格式 +回复消息使用格式:`"发送者:消息内容"` 或 `"发送者:消息内容"` + +系统会自动查找匹配的原始消息并进行回复。 + +## 高级用法 + +### 1. 消息发送队列 + +```python +import asyncio + +class MessageQueue: + def __init__(self): + self.queue = asyncio.Queue() + self.running = False + + async def add_message(self, chat_stream, message_type, content, options=None): + """添加消息到队列""" + message_item = { + "chat_stream": chat_stream, + "type": message_type, + "content": content, + "options": options or {} + } + await self.queue.put(message_item) + + async def process_queue(self): + """处理消息队列""" + self.running = True + + while self.running: + try: + message_item = await asyncio.wait_for(self.queue.get(), timeout=1.0) + + # 发送消息 + success = await smart_send( + message_item["chat_stream"], + { + "type": message_item["type"], + "content": message_item["content"], + "options": message_item["options"] + } + ) + + # 标记任务完成 + self.queue.task_done() + + # 发送间隔 + await asyncio.sleep(0.5) + + except asyncio.TimeoutError: + continue + except Exception as e: + print(f"处理消息队列出错: {e}") +``` + +### 2. 消息模板系统 + +```python +class MessageTemplate: + def __init__(self): + self.templates = { + "welcome": "欢迎 {nickname} 加入群聊!", + "goodbye": "{nickname} 离开了群聊。", + "notification": "🔔 通知:{message}", + "error": "❌ 错误:{error_message}", + "success": "✅ 成功:{message}" + } + + def format_message(self, template_name: str, **kwargs) -> str: + """格式化消息模板""" + template = self.templates.get(template_name, "{message}") + return template.format(**kwargs) + + async def send_template(self, chat_stream, template_name: str, **kwargs): + """发送模板消息""" + message = self.format_message(template_name, **kwargs) + + if chat_stream.group_info: + return await send_api.text_to_group(message, chat_stream.group_info.group_id) + else: + return await send_api.text_to_user(message, chat_stream.user_info.user_id) + +# 使用示例 +template_system = MessageTemplate() +await template_system.send_template(chat_stream, "welcome", nickname="张三") +``` + +## 注意事项 + +1. **异步操作**:所有发送函数都是异步的,必须使用`await` +2. **错误处理**:发送失败时返回False,成功时返回True +3. **发送频率**:注意控制发送频率,避免被平台限制 +4. **内容限制**:注意平台对消息内容和长度的限制 +5. **权限检查**:确保机器人有发送消息的权限 +6. **编码格式**:图片和表情包需要使用base64编码 +7. **存储选项**:可以选择是否将发送的消息存储到数据库 \ No newline at end of file diff --git a/docs/plugins/api/utils-api.md b/docs/plugins/api/utils-api.md new file mode 100644 index 000000000..bbab092e6 --- /dev/null +++ b/docs/plugins/api/utils-api.md @@ -0,0 +1,435 @@ +# 工具API + +工具API模块提供了各种辅助功能,包括文件操作、时间处理、唯一ID生成等常用工具函数。 + +## 导入方式 + +```python +from src.plugin_system.apis import utils_api +``` + +## 主要功能 + +### 1. 文件操作 + +#### `get_plugin_path(caller_frame=None) -> str` +获取调用者插件的路径 + +**参数:** +- `caller_frame`:调用者的栈帧,默认为None(自动获取) + +**返回:** +- `str`:插件目录的绝对路径 + +**示例:** +```python +plugin_path = utils_api.get_plugin_path() +print(f"插件路径: {plugin_path}") +``` + +#### `read_json_file(file_path: str, default: Any = None) -> Any` +读取JSON文件 + +**参数:** +- `file_path`:文件路径,可以是相对于插件目录的路径 +- `default`:如果文件不存在或读取失败时返回的默认值 + +**返回:** +- `Any`:JSON数据或默认值 + +**示例:** +```python +# 读取插件配置文件 +config = utils_api.read_json_file("config.json", {}) +settings = utils_api.read_json_file("data/settings.json", {"enabled": True}) +``` + +#### `write_json_file(file_path: str, data: Any, indent: int = 2) -> bool` +写入JSON文件 + +**参数:** +- `file_path`:文件路径,可以是相对于插件目录的路径 +- `data`:要写入的数据 +- `indent`:JSON缩进 + +**返回:** +- `bool`:是否写入成功 + +**示例:** +```python +data = {"name": "test", "value": 123} +success = utils_api.write_json_file("output.json", data) +``` + +### 2. 时间相关 + +#### `get_timestamp() -> int` +获取当前时间戳 + +**返回:** +- `int`:当前时间戳(秒) + +#### `format_time(timestamp: Optional[int] = None, format_str: str = "%Y-%m-%d %H:%M:%S") -> str` +格式化时间 + +**参数:** +- `timestamp`:时间戳,如果为None则使用当前时间 +- `format_str`:时间格式字符串 + +**返回:** +- `str`:格式化后的时间字符串 + +#### `parse_time(time_str: str, format_str: str = "%Y-%m-%d %H:%M:%S") -> int` +解析时间字符串为时间戳 + +**参数:** +- `time_str`:时间字符串 +- `format_str`:时间格式字符串 + +**返回:** +- `int`:时间戳(秒) + +### 3. 其他工具 + +#### `generate_unique_id() -> str` +生成唯一ID + +**返回:** +- `str`:唯一ID + +## 使用示例 + +### 1. 插件数据管理 + +```python +from src.plugin_system.apis import utils_api + +class DataPlugin(BasePlugin): + def __init__(self): + self.plugin_path = utils_api.get_plugin_path() + self.data_file = "plugin_data.json" + self.load_data() + + def load_data(self): + """加载插件数据""" + default_data = { + "users": {}, + "settings": {"enabled": True}, + "stats": {"message_count": 0} + } + self.data = utils_api.read_json_file(self.data_file, default_data) + + def save_data(self): + """保存插件数据""" + return utils_api.write_json_file(self.data_file, self.data) + + async def handle_action(self, action_data, chat_stream): + # 更新统计信息 + self.data["stats"]["message_count"] += 1 + self.data["stats"]["last_update"] = utils_api.get_timestamp() + + # 保存数据 + if self.save_data(): + return {"success": True, "message": "数据已保存"} + else: + return {"success": False, "message": "数据保存失败"} +``` + +### 2. 日志记录系统 + +```python +class PluginLogger: + def __init__(self, plugin_name: str): + self.plugin_name = plugin_name + self.log_file = f"{plugin_name}_log.json" + self.logs = utils_api.read_json_file(self.log_file, []) + + def log_event(self, event_type: str, message: str, data: dict = None): + """记录事件""" + log_entry = { + "id": utils_api.generate_unique_id(), + "timestamp": utils_api.get_timestamp(), + "formatted_time": utils_api.format_time(), + "event_type": event_type, + "message": message, + "data": data or {} + } + + self.logs.append(log_entry) + + # 保持最新的100条记录 + if len(self.logs) > 100: + self.logs = self.logs[-100:] + + # 保存到文件 + utils_api.write_json_file(self.log_file, self.logs) + + def get_logs_by_type(self, event_type: str) -> list: + """获取指定类型的日志""" + return [log for log in self.logs if log["event_type"] == event_type] + + def get_recent_logs(self, count: int = 10) -> list: + """获取最近的日志""" + return self.logs[-count:] + +# 使用示例 +logger = PluginLogger("my_plugin") +logger.log_event("user_action", "用户发送了消息", {"user_id": "123", "message": "hello"}) +``` + +### 3. 配置管理系统 + +```python +class ConfigManager: + def __init__(self, config_file: str = "plugin_config.json"): + self.config_file = config_file + self.default_config = { + "enabled": True, + "debug": False, + "max_users": 100, + "response_delay": 1.0, + "features": { + "auto_reply": True, + "logging": True + } + } + self.config = self.load_config() + + def load_config(self) -> dict: + """加载配置""" + return utils_api.read_json_file(self.config_file, self.default_config) + + def save_config(self) -> bool: + """保存配置""" + return utils_api.write_json_file(self.config_file, self.config, indent=4) + + def get(self, key: str, default=None): + """获取配置值,支持嵌套访问""" + keys = key.split('.') + value = self.config + + for k in keys: + if isinstance(value, dict) and k in value: + value = value[k] + else: + return default + + return value + + def set(self, key: str, value): + """设置配置值,支持嵌套设置""" + keys = key.split('.') + config = self.config + + for k in keys[:-1]: + if k not in config: + config[k] = {} + config = config[k] + + config[keys[-1]] = value + + def update_config(self, updates: dict): + """批量更新配置""" + def deep_update(base, updates): + for key, value in updates.items(): + if isinstance(value, dict) and key in base and isinstance(base[key], dict): + deep_update(base[key], value) + else: + base[key] = value + + deep_update(self.config, updates) + +# 使用示例 +config = ConfigManager() +print(f"调试模式: {config.get('debug', False)}") +print(f"自动回复: {config.get('features.auto_reply', True)}") + +config.set('features.new_feature', True) +config.save_config() +``` + +### 4. 缓存系统 + +```python +class PluginCache: + def __init__(self, cache_file: str = "plugin_cache.json", ttl: int = 3600): + self.cache_file = cache_file + self.ttl = ttl # 缓存过期时间(秒) + self.cache = self.load_cache() + + def load_cache(self) -> dict: + """加载缓存""" + return utils_api.read_json_file(self.cache_file, {}) + + def save_cache(self): + """保存缓存""" + return utils_api.write_json_file(self.cache_file, self.cache) + + def get(self, key: str): + """获取缓存值""" + if key not in self.cache: + return None + + item = self.cache[key] + current_time = utils_api.get_timestamp() + + # 检查是否过期 + if current_time - item["timestamp"] > self.ttl: + del self.cache[key] + return None + + return item["value"] + + def set(self, key: str, value): + """设置缓存值""" + self.cache[key] = { + "value": value, + "timestamp": utils_api.get_timestamp() + } + self.save_cache() + + def clear_expired(self): + """清理过期缓存""" + current_time = utils_api.get_timestamp() + expired_keys = [] + + for key, item in self.cache.items(): + if current_time - item["timestamp"] > self.ttl: + expired_keys.append(key) + + for key in expired_keys: + del self.cache[key] + + if expired_keys: + self.save_cache() + + return len(expired_keys) + +# 使用示例 +cache = PluginCache(ttl=1800) # 30分钟过期 +cache.set("user_data_123", {"name": "张三", "score": 100}) +user_data = cache.get("user_data_123") +``` + +### 5. 时间处理工具 + +```python +class TimeHelper: + @staticmethod + def get_time_info(): + """获取当前时间的详细信息""" + timestamp = utils_api.get_timestamp() + return { + "timestamp": timestamp, + "datetime": utils_api.format_time(timestamp), + "date": utils_api.format_time(timestamp, "%Y-%m-%d"), + "time": utils_api.format_time(timestamp, "%H:%M:%S"), + "year": utils_api.format_time(timestamp, "%Y"), + "month": utils_api.format_time(timestamp, "%m"), + "day": utils_api.format_time(timestamp, "%d"), + "weekday": utils_api.format_time(timestamp, "%A") + } + + @staticmethod + def time_ago(timestamp: int) -> str: + """计算时间差""" + current = utils_api.get_timestamp() + diff = current - timestamp + + if diff < 60: + return f"{diff}秒前" + elif diff < 3600: + return f"{diff // 60}分钟前" + elif diff < 86400: + return f"{diff // 3600}小时前" + else: + return f"{diff // 86400}天前" + + @staticmethod + def parse_duration(duration_str: str) -> int: + """解析时间段字符串,返回秒数""" + import re + + pattern = r'(\d+)([smhd])' + matches = re.findall(pattern, duration_str.lower()) + + total_seconds = 0 + for value, unit in matches: + value = int(value) + if unit == 's': + total_seconds += value + elif unit == 'm': + total_seconds += value * 60 + elif unit == 'h': + total_seconds += value * 3600 + elif unit == 'd': + total_seconds += value * 86400 + + return total_seconds + +# 使用示例 +time_info = TimeHelper.get_time_info() +print(f"当前时间: {time_info['datetime']}") + +last_seen = 1699000000 +print(f"最后见面: {TimeHelper.time_ago(last_seen)}") + +duration = TimeHelper.parse_duration("1h30m") # 1小时30分钟 = 5400秒 +``` + +## 最佳实践 + +### 1. 错误处理 +```python +def safe_file_operation(file_path: str, data: dict): + """安全的文件操作""" + try: + success = utils_api.write_json_file(file_path, data) + if not success: + logger.warning(f"文件写入失败: {file_path}") + return success + except Exception as e: + logger.error(f"文件操作出错: {e}") + return False +``` + +### 2. 路径处理 +```python +import os + +def get_data_path(filename: str) -> str: + """获取数据文件的完整路径""" + plugin_path = utils_api.get_plugin_path() + data_dir = os.path.join(plugin_path, "data") + + # 确保数据目录存在 + os.makedirs(data_dir, exist_ok=True) + + return os.path.join(data_dir, filename) +``` + +### 3. 定期清理 +```python +async def cleanup_old_files(): + """清理旧文件""" + plugin_path = utils_api.get_plugin_path() + current_time = utils_api.get_timestamp() + + for filename in os.listdir(plugin_path): + if filename.endswith('.tmp'): + file_path = os.path.join(plugin_path, filename) + file_time = os.path.getmtime(file_path) + + # 删除超过24小时的临时文件 + if current_time - file_time > 86400: + os.remove(file_path) +``` + +## 注意事项 + +1. **相对路径**:文件路径支持相对于插件目录的路径 +2. **自动创建目录**:写入文件时会自动创建必要的目录 +3. **错误处理**:所有函数都有错误处理,失败时返回默认值 +4. **编码格式**:文件读写使用UTF-8编码 +5. **时间格式**:时间戳使用秒为单位 +6. **JSON格式**:JSON文件使用可读性好的缩进格式 \ No newline at end of file diff --git a/docs/plugins/quick-start.md b/docs/plugins/quick-start.md index 9386d6920..03b6546da 100644 --- a/docs/plugins/quick-start.md +++ b/docs/plugins/quick-start.md @@ -123,7 +123,7 @@ class HelloAction(BaseAction): """执行问候动作 - 这是核心功能""" # 发送问候消息 greeting_message = self.action_data.get("greeting_message","") - + message = "嗨!很开心见到你!😊" + greeting_message await self.send_text(message) @@ -229,7 +229,7 @@ class ByeAction(BaseAction): async def execute(self) -> Tuple[bool, str]: bye_message = self.action_data.get("bye_message","") - + message = "再见!期待下次聊天!👋" + bye_message await self.send_text(message) return True, "发送了告别消息" @@ -474,53 +474,6 @@ async def execute(self) -> Tuple[bool, str]: 注意:配置文件是自动生成的,不要手动创建! ``` -## 🎯 你学会了什么 - -恭喜!你刚刚从零开始创建了一个完整的MaiCore插件!让我们回顾一下: - -### 核心概念 - -- **插件(Plugin)**: 包含多个功能组件的集合 -- **Action组件**: 智能动作,由麦麦根据情境自动选择使用 -- **Command组件**: 直接响应用户命令的功能 -- **配置Schema**: 定义配置结构,系统自动生成配置文件 - -### 开发流程 - -1. ✅ 创建最简单的插件框架 -2. ✅ 添加Action -3. ✅ 理解激活系统的工作原理 -4. ✅ 尝试KEYWORD激活的Action(进阶) -5. ✅ 添加Command组件 -6. ✅ 可选定义配置Schema -7. ✅ 测试完整功能 - -## 📚 进阶学习 - -现在你已经掌握了基础,可以继续深入学习: - -1. **掌握更多Action功能** 📖 [Action组件详解](action-components.md) - - - 学习不同的激活方式 - - 了解Action的生命周期 - - 掌握参数传递 -2. **学会配置管理** ⚙️ [插件配置定义指南](configuration-guide.md) - - - 定义配置Schema - - 自动生成配置文件 - - 配置验证和类型检查 -3. **深入Command系统** 📖 [Command组件详解](command-components.md) - - - 复杂正则表达式 - - 参数提取和处理 - - 错误处理 -4. **掌握API系统** 📖 [新API使用指南](examples/replyer_api_usage.md) - - - replyer_1智能生成 - - 高级消息处理 - - 表情和媒体发送 - -祝你插件开发愉快!🎉 ``` diff --git a/plugins/hello_world_plugin/plugin.py b/plugins/hello_world_plugin/plugin.py index ad5f0f748..d517df210 100644 --- a/plugins/hello_world_plugin/plugin.py +++ b/plugins/hello_world_plugin/plugin.py @@ -1,8 +1,7 @@ from typing import List, Tuple, Type from src.plugin_system import ( BasePlugin, register_plugin, BaseAction, BaseCommand, - ComponentInfo, ActionActivationType, ChatMode, - ConfigField + ComponentInfo, ActionActivationType, ConfigField ) # ===== Action组件 ===== diff --git a/src/plugin_system/apis/send_api.py b/src/plugin_system/apis/send_api.py index 6ccede4ff..c64736a0a 100644 --- a/src/plugin_system/apis/send_api.py +++ b/src/plugin_system/apis/send_api.py @@ -2,10 +2,20 @@ 发送API模块 专门负责发送各种类型的消息,采用标准Python包设计模式 + 使用方式: from src.plugin_system.apis import send_api + + # 方式1:直接使用stream_id(推荐) + await send_api.text_to_stream("hello", stream_id) + await send_api.emoji_to_stream(emoji_base64, stream_id) + await send_api.custom_to_stream("video", video_data, stream_id) + + # 方式2:使用群聊/私聊指定函数 await send_api.text_to_group("hello", "123456") - await send_api.emoji_to_group(emoji_base64, "123456") + await send_api.text_to_user("hello", "987654") + + # 方式3:使用通用custom_message函数 await send_api.custom_message("video", video_data, "123456", True) """ @@ -224,6 +234,96 @@ async def _find_reply_message(target_stream, reply_to: str) -> Optional[MessageR # ============================================================================= +async def text_to_stream( + text: str, + stream_id: str, + typing: bool = False, + reply_to: str = "", + storage_message: bool = True, +) -> bool: + """向指定流发送文本消息 + + Args: + text: 要发送的文本内容 + stream_id: 聊天流ID + typing: 是否显示正在输入 + reply_to: 回复消息,格式为"发送者:消息内容" + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + return await _send_to_target("text", text, stream_id, "", typing, reply_to, storage_message) + + +async def emoji_to_stream(emoji_base64: str, stream_id: str, storage_message: bool = True) -> bool: + """向指定流发送表情包 + + Args: + emoji_base64: 表情包的base64编码 + stream_id: 聊天流ID + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + return await _send_to_target("emoji", emoji_base64, stream_id, "", typing=False, storage_message=storage_message) + + +async def image_to_stream(image_base64: str, stream_id: str, storage_message: bool = True) -> bool: + """向指定流发送图片 + + Args: + image_base64: 图片的base64编码 + stream_id: 聊天流ID + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + return await _send_to_target("image", image_base64, stream_id, "", typing=False, storage_message=storage_message) + + +async def command_to_stream(command: str, stream_id: str, storage_message: bool = True) -> bool: + """向指定流发送命令 + + Args: + command: 命令 + stream_id: 聊天流ID + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + return await _send_to_target("command", command, stream_id, "", typing=False, storage_message=storage_message) + + +async def custom_to_stream( + message_type: str, + content: str, + stream_id: str, + display_message: str = "", + typing: bool = False, + reply_to: str = "", + storage_message: bool = True, +) -> bool: + """向指定流发送自定义类型消息 + + Args: + message_type: 消息类型,如"text"、"image"、"emoji"、"video"、"file"等 + content: 消息内容(通常是base64编码或文本) + stream_id: 聊天流ID + display_message: 显示消息 + typing: 是否显示正在输入 + reply_to: 回复消息,格式为"发送者:消息内容" + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + return await _send_to_target(message_type, content, stream_id, display_message, typing, reply_to, storage_message) + + async def text_to_group( text: str, group_id: str, diff --git a/src/plugin_system/base/base_action.py b/src/plugin_system/base/base_action.py index b0cd2d194..6432f5645 100644 --- a/src/plugin_system/base/base_action.py +++ b/src/plugin_system/base/base_action.py @@ -204,22 +204,20 @@ class BaseAction(ABC): Args: content: 文本内容 + reply_to: 回复消息,格式为"发送者:消息内容" Returns: bool: 是否发送成功 """ - if not self.target_id or not self.platform: - logger.error(f"{self.log_prefix} 缺少发送消息所需的信息") + if not self.chat_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") return False - if self.is_group: - return await send_api.text_to_group( - text=content, group_id=self.target_id, platform=self.platform, reply_to=reply_to - ) - else: - return await send_api.text_to_user( - text=content, user_id=self.target_id, platform=self.platform, reply_to=reply_to - ) + return await send_api.text_to_stream( + text=content, + stream_id=self.chat_id, + reply_to=reply_to + ) async def send_emoji(self, emoji_base64: str) -> bool: """发送表情包 @@ -230,17 +228,11 @@ class BaseAction(ABC): Returns: bool: 是否发送成功 """ - # 导入send_api - from src.plugin_system.apis import send_api - - if not self.target_id or not self.platform: - logger.error(f"{self.log_prefix} 缺少发送消息所需的信息") + if not self.chat_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") return False - if self.is_group: - return await send_api.emoji_to_group(emoji_base64, self.target_id, self.platform) - else: - return await send_api.emoji_to_user(emoji_base64, self.target_id, self.platform) + return await send_api.emoji_to_stream(emoji_base64, self.chat_id) async def send_image(self, image_base64: str) -> bool: """发送图片 @@ -251,43 +243,34 @@ class BaseAction(ABC): Returns: bool: 是否发送成功 """ - # 导入send_api - from src.plugin_system.apis import send_api - - if not self.target_id or not self.platform: - logger.error(f"{self.log_prefix} 缺少发送消息所需的信息") + if not self.chat_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") return False - if self.is_group: - return await send_api.image_to_group(image_base64, self.target_id, self.platform) - else: - return await send_api.image_to_user(image_base64, self.target_id, self.platform) + return await send_api.image_to_stream(image_base64, self.chat_id) - async def send_custom(self, message_type: str, content: str, typing: bool = False) -> bool: + async def send_custom(self, message_type: str, content: str, typing: bool = False, reply_to: str = "") -> bool: """发送自定义类型消息 Args: message_type: 消息类型,如"video"、"file"、"audio"等 content: 消息内容 typing: 是否显示正在输入 + reply_to: 回复消息,格式为"发送者:消息内容" Returns: bool: 是否发送成功 """ - # 导入send_api - from src.plugin_system.apis import send_api - - if not self.target_id or not self.platform: - logger.error(f"{self.log_prefix} 缺少发送消息所需的信息") + if not self.chat_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") return False - return await send_api.custom_message( + return await send_api.custom_to_stream( message_type=message_type, content=content, - target_id=self.target_id, - is_group=self.is_group, - platform=self.platform, + stream_id=self.chat_id, typing=typing, + reply_to=reply_to, ) async def store_action_info( @@ -318,36 +301,30 @@ class BaseAction(ABC): ) -> bool: """发送命令消息 - 使用和send_text相同的方式通过MessageAPI发送命令 + 使用stream API发送命令 Args: command_name: 命令名称 args: 命令参数 display_message: 显示消息 + storage_message: 是否存储消息到数据库 Returns: bool: 是否发送成功 """ try: + if not self.chat_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + # 构造命令数据 command_data = {"name": command_name, "args": args or {}} - if self.is_group: - # 群聊 - success = await send_api.command_to_group( - command=command_data, - group_id=str(self.group_id), - platform=self.platform, - storage_message=storage_message, - ) - else: - # 私聊 - success = await send_api.command_to_user( - command=command_data, - user_id=str(self.user_id), - platform=self.platform, - storage_message=storage_message, - ) + success = await send_api.command_to_stream( + command=command_data, + stream_id=self.chat_id, + storage_message=storage_message, + ) if success: logger.info(f"{self.log_prefix} 成功发送命令: {command_name}") diff --git a/src/plugin_system/base/base_command.py b/src/plugin_system/base/base_command.py index 4c5cb1ee0..50b345fc5 100644 --- a/src/plugin_system/base/base_command.py +++ b/src/plugin_system/base/base_command.py @@ -86,30 +86,30 @@ class BaseCommand(ABC): return current - async def send_text(self, content: str) -> None: + async def send_text(self, content: str, reply_to: str = "") -> bool: """发送回复消息 Args: content: 回复内容 + reply_to: 回复消息,格式为"发送者:消息内容" + + Returns: + bool: 是否发送成功 """ # 获取聊天流信息 chat_stream = self.message.chat_stream + if not chat_stream or not hasattr(chat_stream, 'stream_id'): + logger.error(f"{self.log_prefix} 缺少聊天流或stream_id") + return False - if chat_stream.group_info: - # 群聊 - - await send_api.text_to_group( - text=content, group_id=str(chat_stream.group_info.group_id), platform=chat_stream.platform - ) - else: - # 私聊 - - await send_api.text_to_user( - text=content, user_id=str(chat_stream.user_info.user_id), platform=chat_stream.platform - ) + return await send_api.text_to_stream( + text=content, + stream_id=chat_stream.stream_id, + reply_to=reply_to + ) async def send_type( - self, message_type: str, content: str, display_message: str = None, typing: bool = False + self, message_type: str, content: str, display_message: str = "", typing: bool = False, reply_to: str = "" ) -> bool: """发送指定类型的回复消息到当前聊天环境 @@ -117,78 +117,54 @@ class BaseCommand(ABC): message_type: 消息类型,如"text"、"image"、"emoji"等 content: 消息内容 display_message: 显示消息(可选) + typing: 是否显示正在输入 + reply_to: 回复消息,格式为"发送者:消息内容" Returns: bool: 是否发送成功 """ # 获取聊天流信息 chat_stream = self.message.chat_stream + if not chat_stream or not hasattr(chat_stream, 'stream_id'): + logger.error(f"{self.log_prefix} 缺少聊天流或stream_id") + return False - if chat_stream.group_info: - # 群聊 - from src.plugin_system.apis import send_api + return await send_api.custom_to_stream( + message_type=message_type, + content=content, + stream_id=chat_stream.stream_id, + display_message=display_message, + typing=typing, + reply_to=reply_to, + ) - return await send_api.custom_message( - message_type=message_type, - content=content, - target_id=str(chat_stream.group_info.group_id), - is_group=True, - platform=chat_stream.platform, - typing=typing, - ) - else: - # 私聊 - from src.plugin_system.apis import send_api - - return await send_api.custom_message( - message_type=message_type, - content=content, - target_id=str(chat_stream.user_info.user_id), - is_group=False, - platform=chat_stream.platform, - typing=typing, - ) - - async def send_command(self, command_name: str, args: dict = None, display_message: str = None) -> bool: + async def send_command(self, command_name: str, args: dict = None, display_message: str = "", storage_message: bool = True) -> bool: """发送命令消息 Args: command_name: 命令名称 args: 命令参数 display_message: 显示消息 + storage_message: 是否存储消息到数据库 Returns: bool: 是否发送成功 """ try: + # 获取聊天流信息 + chat_stream = self.message.chat_stream + if not chat_stream or not hasattr(chat_stream, 'stream_id'): + logger.error(f"{self.log_prefix} 缺少聊天流或stream_id") + return False + # 构造命令数据 command_data = {"name": command_name, "args": args or {}} - # 获取聊天流信息 - chat_stream = self.message.chat_stream - - if chat_stream.group_info: - # 群聊 - from src.plugin_system.apis import send_api - - success = await send_api.custom_message( - message_type="command", - content=command_data, - target_id=str(chat_stream.group_info.group_id), - is_group=True, - platform=chat_stream.platform, - ) - else: - # 私聊 - from src.plugin_system.apis import send_api - - success = await send_api.custom_message( - message_type="command", - content=command_data, - target_id=str(chat_stream.user_info.user_id), - is_group=False, - platform=chat_stream.platform, - ) + success = await send_api.command_to_stream( + command=command_data, + stream_id=chat_stream.stream_id, + storage_message=storage_message, + ) if success: logger.info(f"{self.log_prefix} 成功发送命令: {command_name}") @@ -201,6 +177,38 @@ class BaseCommand(ABC): logger.error(f"{self.log_prefix} 发送命令时出错: {e}") return False + async def send_emoji(self, emoji_base64: str) -> bool: + """发送表情包 + + Args: + emoji_base64: 表情包的base64编码 + + Returns: + bool: 是否发送成功 + """ + chat_stream = self.message.chat_stream + if not chat_stream or not hasattr(chat_stream, 'stream_id'): + logger.error(f"{self.log_prefix} 缺少聊天流或stream_id") + return False + + return await send_api.emoji_to_stream(emoji_base64, chat_stream.stream_id) + + async def send_image(self, image_base64: str) -> bool: + """发送图片 + + Args: + image_base64: 图片的base64编码 + + Returns: + bool: 是否发送成功 + """ + chat_stream = self.message.chat_stream + if not chat_stream or not hasattr(chat_stream, 'stream_id'): + logger.error(f"{self.log_prefix} 缺少聊天流或stream_id") + return False + + return await send_api.image_to_stream(image_base64, chat_stream.stream_id) + @classmethod def get_command_info(cls) -> "CommandInfo": """从类属性生成CommandInfo