feat:requirements.txt中添加多个新依赖项(为了适配联网搜索和麦麦空间插件)

优化emoji_manager.py中的数据库查询代码。
This commit is contained in:
minecraft1024a
2025-08-11 21:45:55 +08:00
committed by Windpicker-owo
parent 23ee3767ef
commit c2c895d3ba
12 changed files with 3395 additions and 3 deletions

1
.gitignore vendored
View File

@@ -25,6 +25,7 @@ log_debug/
run_amds.bat
run_none.bat
run.py
cookies-*.json
message_queue_content.txt
message_queue_content.bat
message_queue_window.bat

View File

@@ -49,3 +49,12 @@ scikit-learn
seaborn
structlog
watchdog
httpx>=0.24.0
requests>=2.28.0
beautifulsoup4>=4.11.0
lxml>=4.9.0
json5>=0.9.0
toml>=0.10.0
beautifulsoup4
exa_py
asyncddgs

View File

@@ -655,7 +655,7 @@ class EmojiManager:
self._ensure_db()
logger.debug("[数据库] 开始加载所有表情包记录 ...")
emoji_instances = session.execute(stmt = select(Emoji)).scalars().all()
emoji_instances = session.execute(select(Emoji)).scalars().all()
emoji_objects, load_errors = _to_emoji_objects(emoji_instances)
# 更新内存中的列表和数量
@@ -684,12 +684,12 @@ class EmojiManager:
self._ensure_db()
if emoji_hash:
session.execute(select(Emoji).where(Emoji.emoji_hash == emoji_hash)).scalars().all()
query = session.execute(select(Emoji).where(Emoji.emoji_hash == emoji_hash)).scalars().all()
else:
logger.warning(
"[查询] 未提供 hash将尝试加载所有表情包建议使用 get_all_emoji_from_db 更新管理器状态。"
)
query = session.execute(select(Emoji)).scalars().all()
query = session.execute(select(Emoji)).scalars().all()
emoji_instances = query
emoji_objects, load_errors = _to_emoji_objects(emoji_instances)

View File

View File

@@ -0,0 +1,50 @@
{
"manifest_version": 1,
"name": "MaiZone麦麦空间",
"version": "2.0.0",
"description": "让你的麦麦发QQ空间说说、评论、点赞支持AI配图、定时发送和自动监控功能",
"author": {
"name": "重构版",
"url": "https://github.com/internetsb"
},
"license": "AGPL-v3.0",
"host_application": {
"min_version": "0.8.0",
"max_version": "0.10.0"
},
"homepage_url": "https://github.com/internetsb/Maizone",
"repository_url": "https://github.com/internetsb/Maizone",
"keywords": ["QQ空间", "说说", "动态", "评论", "点赞", "自动化", "AI配图"],
"categories": ["社交", "自动化", "QQ空间"],
"plugin_info": {
"is_built_in": false,
"plugin_type": "social",
"components": [
{
"type": "action",
"name": "send_feed",
"description": "根据指定主题发送一条QQ空间说说"
},
{
"type": "action",
"name": "read_feed",
"description": "读取指定好友最近的说说,并评论点赞"
},
{
"type": "command",
"name": "send_feed",
"description": "通过命令发送QQ空间说说"
}
],
"features": [
"智能生成说说内容",
"AI自动配图硅基流动",
"自动点赞评论好友说说",
"定时发送说说",
"权限管理系统",
"历史记录避重"
]
}
}

View File

@@ -0,0 +1,474 @@
"""
MaiZone插件独立配置文件加载系统
这个模块提供了一个独立的配置文件加载系统用于替代原本插件中的config加载系统。
它支持TOML格式的配置文件具有配置验证、默认值处理、类型转换等功能。
"""
import os
import toml
import shutil
import datetime
from typing import Dict, Any, Union, List, Optional, Type
from dataclasses import dataclass, field
from pathlib import Path
from src.common.logger import get_logger
logger = get_logger("MaiZone.ConfigLoader")
@dataclass
class ConfigFieldSpec:
"""配置字段规格定义"""
name: str
type_hint: Type
default: Any
description: str = ""
required: bool = False
choices: Optional[List[Any]] = None
min_value: Optional[Union[int, float]] = None
max_value: Optional[Union[int, float]] = None
def validate_value(self, value: Any) -> tuple[bool, str]:
"""验证配置值是否符合规格"""
# 类型检查
if not isinstance(value, self.type_hint):
try:
# 尝试类型转换
if self.type_hint == bool and isinstance(value, str):
value = value.lower() in ('true', '1', 'yes', 'on')
else:
value = self.type_hint(value)
except (ValueError, TypeError):
return False, f"类型错误: 期望 {self.type_hint.__name__}, 得到 {type(value).__name__}"
# 选择项检查
if self.choices and value not in self.choices:
return False, f"值不在允许范围内: {self.choices}"
# 数值范围检查
if isinstance(value, (int, float)):
if self.min_value is not None and value < self.min_value:
return False, f"值小于最小值 {self.min_value}"
if self.max_value is not None and value > self.max_value:
return False, f"值大于最大值 {self.max_value}"
return True, ""
@dataclass
class ConfigSectionSpec:
"""配置节规格定义"""
name: str
description: str = ""
fields: Dict[str, ConfigFieldSpec] = field(default_factory=dict)
def add_field(self, field_spec: ConfigFieldSpec):
"""添加字段规格"""
self.fields[field_spec.name] = field_spec
def validate_section(self, section_data: Dict[str, Any]) -> tuple[bool, List[str]]:
"""验证配置节数据"""
errors = []
# 检查必需字段
for field_name, field_spec in self.fields.items():
if field_spec.required and field_name not in section_data:
errors.append(f"缺少必需字段: {field_name}")
# 验证每个字段
for field_name, value in section_data.items():
if field_name in self.fields:
field_spec = self.fields[field_name]
is_valid, error_msg = field_spec.validate_value(value)
if not is_valid:
errors.append(f"{field_name}: {error_msg}")
else:
logger.warning(f"未知配置字段: {self.name}.{field_name}")
return len(errors) == 0, errors
class MaiZoneConfigLoader:
"""MaiZone插件独立配置加载器"""
def __init__(self, plugin_dir: str, config_filename: str = "config.toml"):
"""
初始化配置加载器
Args:
plugin_dir: 插件目录路径
config_filename: 配置文件名
"""
self.plugin_dir = Path(plugin_dir)
self.config_filename = config_filename
self.config_file_path = self.plugin_dir / config_filename
self.config_data: Dict[str, Any] = {}
self.config_specs: Dict[str, ConfigSectionSpec] = {}
self.config_version = "2.0.0"
# 初始化配置规格
self._init_config_specs()
def _init_config_specs(self):
"""初始化配置规格定义"""
# 插件基础配置节
plugin_section = ConfigSectionSpec("plugin", "插件基础配置")
plugin_section.add_field(ConfigFieldSpec("enable", bool, True, "是否启用插件"))
plugin_section.add_field(ConfigFieldSpec("config_version", str, "2.0.0", "配置文件版本"))
plugin_section.add_field(ConfigFieldSpec("http_port", str, "3000", "NapCat HTTP服务器端口号"))
plugin_section.add_field(ConfigFieldSpec("http_host", str, "127.0.0.1", "NapCat HTTP服务器地址"))
self.config_specs["plugin"] = plugin_section
# 模型相关配置节
models_section = ConfigSectionSpec("models", "模型相关配置")
models_section.add_field(ConfigFieldSpec("text_model", str, "replyer_1", "生成文本的模型名称"))
models_section.add_field(ConfigFieldSpec("siliconflow_apikey", str, "", "硅基流动AI生图API密钥"))
self.config_specs["models"] = models_section
# 发送说说配置节
send_section = ConfigSectionSpec("send", "发送说说配置")
send_section.add_field(ConfigFieldSpec("permission", list, ["2488036428"], "发送权限QQ号列表"))
send_section.add_field(ConfigFieldSpec("permission_type", str, "whitelist", "权限类型",
choices=["whitelist", "blacklist"]))
send_section.add_field(ConfigFieldSpec("enable_image", bool, False, "是否启用说说配图"))
send_section.add_field(ConfigFieldSpec("enable_ai_image", bool, False, "是否启用AI生成配图"))
send_section.add_field(ConfigFieldSpec("enable_reply", bool, True, "生成完成时是否发出回复"))
send_section.add_field(ConfigFieldSpec("ai_image_number", int, 1, "AI生成图片数量",
min_value=1, max_value=4))
send_section.add_field(ConfigFieldSpec("image_directory", str, "./plugins/built_in/Maizone/images",
"图片存储目录"))
self.config_specs["send"] = send_section
# 阅读说说配置节
read_section = ConfigSectionSpec("read", "阅读说说配置")
read_section.add_field(ConfigFieldSpec("permission", list, [], "阅读权限QQ号列表"))
read_section.add_field(ConfigFieldSpec("permission_type", str, "blacklist", "权限类型",
choices=["whitelist", "blacklist"]))
read_section.add_field(ConfigFieldSpec("read_number", int, 5, "一次读取的说说数量",
min_value=1, max_value=20))
read_section.add_field(ConfigFieldSpec("like_possibility", float, 1.0, "点赞概率",
min_value=0.0, max_value=1.0))
read_section.add_field(ConfigFieldSpec("comment_possibility", float, 0.3, "评论概率",
min_value=0.0, max_value=1.0))
self.config_specs["read"] = read_section
# 自动监控配置节
monitor_section = ConfigSectionSpec("monitor", "自动监控配置")
monitor_section.add_field(ConfigFieldSpec("enable_auto_monitor", bool, False, "是否启用自动监控好友说说"))
monitor_section.add_field(ConfigFieldSpec("interval_minutes", int, 10, "监控间隔时间(分钟)",
min_value=1, max_value=1440))
self.config_specs["monitor"] = monitor_section
# 定时发送配置节
schedule_section = ConfigSectionSpec("schedule", "定时发送配置")
schedule_section.add_field(ConfigFieldSpec("enable_schedule", bool, False, "是否启用定时发送说说"))
schedule_section.add_field(ConfigFieldSpec("schedules", list, [
{"time": "08:00", "topic": "早安"},
{"time": "22:00", "topic": "晚安"}
], "定时发送任务列表"))
self.config_specs["schedule"] = schedule_section
def load_config(self) -> bool:
"""
加载配置文件
Returns:
bool: 是否成功加载
"""
try:
# 如果配置文件不存在,生成默认配置
if not self.config_file_path.exists():
logger.info(f"配置文件不存在,生成默认配置: {self.config_file_path}")
self._generate_default_config()
# 加载配置文件
with open(self.config_file_path, 'r', encoding='utf-8') as f:
self.config_data = toml.load(f)
logger.info(f"成功加载配置文件: {self.config_file_path}")
# 验证配置
self._validate_config()
# 检查版本并迁移
self._check_and_migrate_config()
return True
except Exception as e:
logger.error(f"加载配置文件失败: {e}")
return False
def _generate_default_config(self):
"""生成默认配置文件"""
try:
# 确保插件目录存在
self.plugin_dir.mkdir(parents=True, exist_ok=True)
# 生成默认配置数据
default_config = {}
for section_name, section_spec in self.config_specs.items():
section_data = {}
for field_name, field_spec in section_spec.fields.items():
section_data[field_name] = field_spec.default
default_config[section_name] = section_data
# 保存到文件
self._save_config_to_file(default_config)
self.config_data = default_config
logger.info(f"默认配置文件已生成: {self.config_file_path}")
except Exception as e:
logger.error(f"生成默认配置文件失败: {e}")
def _save_config_to_file(self, config_data: Dict[str, Any]):
"""保存配置到文件(带注释)"""
toml_content = f"# MaiZone插件配置文件\n"
toml_content += f"# 让你的麦麦发QQ空间说说、评论、点赞支持AI配图、定时发送和自动监控功能\n"
toml_content += f"# 配置版本: {self.config_version}\n\n"
for section_name, section_spec in self.config_specs.items():
if section_name not in config_data:
continue
# 添加节描述
toml_content += f"# {section_spec.description}\n"
toml_content += f"[{section_name}]\n\n"
section_data = config_data[section_name]
for field_name, field_spec in section_spec.fields.items():
if field_name not in section_data:
continue
# 添加字段描述
toml_content += f"# {field_spec.description}\n"
if field_spec.choices:
toml_content += f"# 可选值: {', '.join(map(str, field_spec.choices))}\n"
if field_spec.min_value is not None or field_spec.max_value is not None:
range_str = f"# 范围: "
if field_spec.min_value is not None:
range_str += f"最小值 {field_spec.min_value}"
if field_spec.max_value is not None:
if field_spec.min_value is not None:
range_str += f", 最大值 {field_spec.max_value}"
else:
range_str += f"最大值 {field_spec.max_value}"
toml_content += range_str + "\n"
# 添加字段值
value = section_data[field_name]
if isinstance(value, str):
toml_content += f'{field_name} = "{value}"\n'
elif isinstance(value, bool):
toml_content += f"{field_name} = {str(value).lower()}\n"
elif isinstance(value, list):
# 格式化列表
if all(isinstance(item, str) for item in value):
formatted_list = "[" + ", ".join(f'"{item}"' for item in value) + "]"
elif all(isinstance(item, dict) for item in value):
# 处理字典列表如schedules
formatted_items = []
for item in value:
formatted_items.append("{" + ", ".join(f'{k} = "{v}"' for k, v in item.items()) + "}")
formatted_list = "[\n " + ",\n ".join(formatted_items) + ",\n]"
else:
formatted_list = str(value)
toml_content += f"{field_name} = {formatted_list}\n"
else:
toml_content += f"{field_name} = {value}\n"
toml_content += "\n"
toml_content += "\n"
# 写入文件
with open(self.config_file_path, 'w', encoding='utf-8') as f:
f.write(toml_content)
def _validate_config(self) -> bool:
"""验证配置数据"""
all_valid = True
for section_name, section_spec in self.config_specs.items():
if section_name not in self.config_data:
logger.warning(f"配置文件缺少节: {section_name}")
continue
section_data = self.config_data[section_name]
is_valid, errors = section_spec.validate_section(section_data)
if not is_valid:
logger.error(f"配置节 {section_name} 验证失败:")
for error in errors:
logger.error(f" - {error}")
all_valid = False
return all_valid
def _check_and_migrate_config(self):
"""检查配置版本并进行迁移"""
current_version = self.get_config("plugin.config_version", "1.0.0")
if current_version != self.config_version:
logger.info(f"检测到配置版本变更: {current_version} -> {self.config_version}")
# 备份旧配置
self._backup_config()
# 迁移配置
self._migrate_config(current_version, self.config_version)
# 更新版本号
self.config_data["plugin"]["config_version"] = self.config_version
# 保存迁移后的配置
self._save_config_to_file(self.config_data)
logger.info(f"配置已迁移到版本 {self.config_version}")
def _backup_config(self):
"""备份当前配置文件"""
if self.config_file_path.exists():
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = self.config_file_path.with_suffix(f".backup_{timestamp}.toml")
shutil.copy2(self.config_file_path, backup_path)
logger.info(f"配置文件已备份到: {backup_path}")
def _migrate_config(self, from_version: str, to_version: str):
"""迁移配置数据"""
# 创建新的配置结构
new_config = {}
for section_name, section_spec in self.config_specs.items():
new_section = {}
# 复制现有配置值
if section_name in self.config_data:
old_section = self.config_data[section_name]
for field_name, field_spec in section_spec.fields.items():
if field_name in old_section:
new_section[field_name] = old_section[field_name]
else:
new_section[field_name] = field_spec.default
logger.info(f"添加新配置项: {section_name}.{field_name} = {field_spec.default}")
else:
# 新增节,使用默认值
for field_name, field_spec in section_spec.fields.items():
new_section[field_name] = field_spec.default
logger.info(f"添加新配置节: {section_name}")
new_config[section_name] = new_section
self.config_data = new_config
def get_config(self, key: str, default: Any = None) -> Any:
"""
获取配置值,支持嵌套键访问
Args:
key: 配置键名,支持嵌套访问如 "section.field"
default: 默认值
Returns:
Any: 配置值或默认值
"""
keys = key.split('.')
current = self.config_data
for k in keys:
if isinstance(current, dict) and k in current:
current = current[k]
else:
return default
return current
def set_config(self, key: str, value: Any) -> bool:
"""
设置配置值
Args:
key: 配置键名
value: 配置值
Returns:
bool: 是否设置成功
"""
try:
keys = key.split('.')
if len(keys) != 2:
logger.error(f"配置键格式错误: {key},应为 'section.field' 格式")
return False
section_name, field_name = keys
# 检查节是否存在
if section_name not in self.config_specs:
logger.error(f"未知配置节: {section_name}")
return False
# 检查字段是否存在
if field_name not in self.config_specs[section_name].fields:
logger.error(f"未知配置字段: {key}")
return False
# 验证值
field_spec = self.config_specs[section_name].fields[field_name]
is_valid, error_msg = field_spec.validate_value(value)
if not is_valid:
logger.error(f"配置值验证失败 {key}: {error_msg}")
return False
# 设置值
if section_name not in self.config_data:
self.config_data[section_name] = {}
self.config_data[section_name][field_name] = value
logger.debug(f"设置配置: {key} = {value}")
return True
except Exception as e:
logger.error(f"设置配置失败 {key}: {e}")
return False
def save_config(self) -> bool:
"""
保存当前配置到文件
Returns:
bool: 是否保存成功
"""
try:
self._save_config_to_file(self.config_data)
logger.info(f"配置已保存到: {self.config_file_path}")
return True
except Exception as e:
logger.error(f"保存配置失败: {e}")
return False
def reload_config(self) -> bool:
"""
重新加载配置文件
Returns:
bool: 是否重新加载成功
"""
return self.load_config()
def get_config_info(self) -> Dict[str, Any]:
"""
获取配置信息
Returns:
Dict[str, Any]: 配置信息
"""
return {
"config_file": str(self.config_file_path),
"config_version": self.config_version,
"sections": list(self.config_specs.keys()),
"loaded": bool(self.config_data)
}

View File

@@ -0,0 +1,242 @@
import asyncio
import random
import time
import traceback
from typing import List, Dict, Any
from src.common.logger import get_logger
from src.plugin_system.apis import llm_api, config_api
# 导入工具模块
import sys
import os
sys.path.append(os.path.dirname(__file__))
from qzone_utils import QZoneManager
# 获取日志记录器
logger = get_logger('MaiZone-Monitor')
class MonitorManager:
"""监控管理器 - 负责自动监控好友说说并点赞评论"""
def __init__(self, plugin):
"""初始化监控管理器"""
self.plugin = plugin
self.is_running = False
self.task = None
self.last_check_time = 0
logger.info("监控管理器初始化完成")
async def start(self):
"""启动监控任务"""
if self.is_running:
logger.warning("监控任务已在运行中")
return
self.is_running = True
self.task = asyncio.create_task(self._monitor_loop())
logger.info("说说监控任务已启动")
async def stop(self):
"""停止监控任务"""
if not self.is_running:
return
self.is_running = False
if self.task:
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
logger.info("监控任务已被取消")
logger.info("说说监控任务已停止")
async def _monitor_loop(self):
"""监控任务主循环"""
while self.is_running:
try:
# 获取监控间隔配置
interval_minutes = int(self.plugin.get_config("monitor.interval_minutes", 10) or 10)
# 等待指定时间间隔
await asyncio.sleep(interval_minutes * 60)
# 执行监控检查
await self._check_and_process_feeds()
except asyncio.CancelledError:
logger.info("监控循环被取消")
break
except Exception as e:
logger.error(f"监控任务出错: {str(e)}")
logger.error(traceback.format_exc())
# 出错后等待5分钟再重试
await asyncio.sleep(300)
async def _check_and_process_feeds(self):
"""检查并处理好友说说"""
try:
# 获取配置
qq_account = config_api.get_global_config("bot.qq_account", "")
port = str(self.plugin.get_config("plugin.http_port", "3000"))
host = str(self.plugin.get_config("plugin.http_host", "127.0.0.1"))
read_num = 10 # 监控时读取较少的说说数量
logger.info("监控任务: 开始检查好友说说")
# 创建QZone管理器
qzone_manager = QZoneManager(port, host)
# 获取监控说说列表
feeds_list = await qzone_manager.monitor_read_feed(qq_account, read_num)
if not feeds_list:
logger.info("监控任务: 未发现新说说")
return
logger.info(f"监控任务: 发现 {len(feeds_list)} 条新说说")
# 处理每条说说
for feed in feeds_list:
try:
await self._process_monitor_feed(feed, qzone_manager)
# 每条说说之间随机延迟
await asyncio.sleep(3 + random.random() * 2)
except Exception as e:
logger.error(f"处理监控说说失败: {str(e)}")
except Exception as e:
logger.error(f"监控检查失败: {str(e)}")
async def _process_monitor_feed(self, feed: Dict[str, Any], qzone_manager: QZoneManager):
"""处理单条监控说说"""
try:
# 提取说说信息
target_qq = feed.get("target_qq", "")
tid = feed.get("tid", "")
content = feed.get("content", "")
images = feed.get("images", [])
rt_con = feed.get("rt_con", "")
# 构建完整内容用于显示
full_content = content
if images:
full_content += f" [图片: {len(images)}张]"
if rt_con:
full_content += f" [转发: {rt_con[:20]}...]"
logger.info(f"监控处理说说: {target_qq} - {full_content[:30]}...")
# 获取配置
qq_account = config_api.get_global_config("bot.qq_account", "")
like_possibility = float(self.plugin.get_config("read.like_possibility", 1.0) or 1.0)
comment_possibility = float(self.plugin.get_config("read.comment_possibility", 0.3) or 0.3)
# 随机决定是否评论
if random.random() <= comment_possibility:
comment = await self._generate_monitor_comment(content, rt_con, target_qq)
if comment:
success = await qzone_manager.comment_feed(qq_account, target_qq, tid, comment)
if success:
logger.info(f"监控评论成功: '{comment}'")
else:
logger.error(f"监控评论失败: {content[:20]}...")
# 随机决定是否点赞
if random.random() <= like_possibility:
success = await qzone_manager.like_feed(qq_account, target_qq, tid)
if success:
logger.info(f"监控点赞成功: {content[:20]}...")
else:
logger.error(f"监控点赞失败: {content[:20]}...")
except Exception as e:
logger.error(f"处理监控说说异常: {str(e)}")
async def _generate_monitor_comment(self, content: str, rt_con: str, target_qq: str) -> str:
"""生成监控评论内容"""
try:
# 获取模型配置
models = llm_api.get_available_models()
text_model = str(self.plugin.get_config("models.text_model", "replyer_1"))
model_config = models.get(text_model)
if not model_config:
logger.error("未配置LLM模型")
return ""
# 获取机器人信息
bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人")
bot_expression = config_api.get_global_config("expression.expression_style", "内容积极向上")
# 构建提示词
if not rt_con:
prompt = f"""
你是'{bot_personality}',你正在浏览你好友'{target_qq}'的QQ空间
你看到了你的好友'{target_qq}'qq空间上内容是'{content}'的说说,你想要发表你的一条评论,
{bot_expression},回复的平淡一些,简短一些,说中文,
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,不要输出多余内容(包括前后缀,冒号和引号,括号()表情包at或 @等 )。只输出回复内容
"""
else:
prompt = f"""
你是'{bot_personality}',你正在浏览你好友'{target_qq}'的QQ空间
你看到了你的好友'{target_qq}'在qq空间上转发了一条内容为'{rt_con}'的说说,你的好友的评论为'{content}'
你想要发表你的一条评论,{bot_expression},回复的平淡一些,简短一些,说中文,
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,不要输出多余内容(包括前后缀,冒号和引号,括号()表情包at或 @等 )。只输出回复内容
"""
logger.info(f"正在为 {target_qq} 的说说生成评论...")
# 生成评论
success, comment, reasoning, model_name = await llm_api.generate_with_model(
prompt=prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if success:
logger.info(f"成功生成监控评论: '{comment}'")
return comment
else:
logger.error("生成监控评论失败")
return ""
except Exception as e:
logger.error(f"生成监控评论异常: {str(e)}")
return ""
def get_status(self) -> Dict[str, Any]:
"""获取监控状态"""
return {
"is_running": self.is_running,
"interval_minutes": self.plugin.get_config("monitor.interval_minutes", 10),
"last_check_time": self.last_check_time,
"enabled": self.plugin.get_config("monitor.enable_auto_monitor", False)
}
async def manual_check(self) -> Dict[str, Any]:
"""手动执行一次监控检查"""
try:
logger.info("执行手动监控检查")
await self._check_and_process_feeds()
return {
"success": True,
"message": "手动监控检查完成",
"timestamp": time.time()
}
except Exception as e:
logger.error(f"手动监控检查失败: {str(e)}")
return {
"success": False,
"message": f"手动监控检查失败: {str(e)}",
"timestamp": time.time()
}

View File

@@ -0,0 +1,829 @@
import asyncio
import random
import time
import traceback
from typing import List, Tuple, Type, Union, Any, Optional
from src.common.logger import get_logger
from src.plugin_system import (
BasePlugin, register_plugin, BaseAction, BaseCommand,
ComponentInfo, ActionActivationType, ChatMode
)
from src.plugin_system.apis import llm_api, config_api, person_api, generator_api
from src.plugin_system.base.config_types import ConfigField
# 导入插件工具模块
import sys
import os
sys.path.append(os.path.dirname(__file__))
from qzone_utils import (
QZoneManager, generate_image_by_sf, get_send_history
)
from scheduler import ScheduleManager
from config_loader import MaiZoneConfigLoader
# 获取日志记录器
logger = get_logger('MaiZone')
# ===== 发送说说命令组件 =====
class SendFeedCommand(BaseCommand):
"""发送说说命令 - 响应 /send_feed 命令"""
command_name = "send_feed"
command_description = "发送一条QQ空间说说"
command_pattern = r"^/send_feed(?:\s+(?P<topic>\w+))?$"
command_help = "发一条主题为<topic>或随机的说说"
command_examples = ["/send_feed", "/send_feed 日常"]
intercept_message = True
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 获取配置加载器引用
self.config_loader = None
self._init_config_loader()
def _init_config_loader(self):
"""初始化配置加载器"""
try:
plugin_dir = os.path.dirname(__file__)
self.config_loader = MaiZoneConfigLoader(plugin_dir)
self.config_loader.load_config()
except Exception as e:
logger.error(f"初始化配置加载器失败: {e}")
def get_config(self, key: str, default=None):
"""获取配置值"""
if self.config_loader:
return self.config_loader.get_config(key, default)
return default
def check_permission(self, qq_account: str) -> bool:
"""检查用户权限"""
permission_list = self.get_config("send.permission", [])
permission_type = self.get_config("send.permission_type", "whitelist")
logger.info(f'权限检查: {permission_type}:{permission_list}')
if not isinstance(permission_list, list):
logger.error("权限列表配置错误")
return False
if permission_type == 'whitelist':
return qq_account in permission_list
elif permission_type == 'blacklist':
return qq_account not in permission_list
else:
logger.error('权限类型配置错误,应为 whitelist 或 blacklist')
return False
async def execute(self) -> Tuple[bool, str, bool]:
"""执行发送说说命令"""
try:
# 获取用户信息
user_id = self.message.message_info.user_info.user_id if self.message and self.message.message_info and self.message.message_info.user_info else None
# 权限检查
if not user_id or not self.check_permission(user_id):
logger.info(f"用户 {user_id} 权限不足")
await self.send_text(f"权限不足,无法使用此命令")
return False, "权限不足", True
# 获取主题
topic = self.matched_groups.get("topic", "")
# 生成说说内容
story = await self._generate_story_content(topic)
if not story:
return False, "生成说说内容失败", True
# 处理图片
await self._handle_images(story)
# 发送说说
success = await self._send_feed(story)
if success:
if self.get_config("send.enable_reply", True):
await self.send_text(f"已发送说说:\n{story}")
return True, "发送成功", True
else:
return False, "发送说说失败", True
except Exception as e:
logger.error(f"发送说说命令执行失败: {str(e)}")
return False, "命令执行失败", True
async def _generate_story_content(self, topic: str) -> str:
"""生成说说内容"""
try:
# 获取模型配置
models = llm_api.get_available_models()
text_model = str(self.get_config("models.text_model", "replyer_1"))
model_config = models.get(text_model)
if not model_config:
logger.error("未配置LLM模型")
return ""
# 获取机器人信息
bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人")
bot_expression = config_api.get_global_config("personality.reply_style", "内容积极向上")
qq_account = config_api.get_global_config("bot.qq_account", "")
# 构建提示词
if topic:
prompt = f"""
你是'{bot_personality}',你想写一条主题是'{topic}'的说说发表在qq空间上
{bot_expression}
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字,
只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出
"""
else:
prompt = f"""
你是'{bot_personality}'你想写一条说说发表在qq空间上主题不限
{bot_expression}
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字,
只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出
"""
# 添加历史记录
prompt += "\n以下是你以前发过的说说,写新说说时注意不要在相隔不长的时间发送相同主题的说说"
history_block = await get_send_history(qq_account)
if history_block:
prompt += history_block
# 生成内容
success, story, reasoning, model_name = await llm_api.generate_with_model(
prompt=prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if success:
logger.info(f"成功生成说说内容:'{story}'")
return story
else:
logger.error("生成说说内容失败")
return ""
except Exception as e:
logger.error(f"生成说说内容异常: {str(e)}")
return ""
async def _handle_images(self, story: str):
"""处理说说配图"""
try:
enable_ai_image = bool(self.get_config("send.enable_ai_image", False))
apikey = str(self.get_config("models.siliconflow_apikey", ""))
image_dir = str(self.get_config("send.image_directory", "./plugins/Maizone/images"))
image_num_raw = self.get_config("send.ai_image_number", 1)
image_num = int(image_num_raw if image_num_raw is not None else 1)
if enable_ai_image and apikey:
await generate_image_by_sf(
api_key=apikey,
story=story,
image_dir=image_dir,
batch_size=image_num
)
elif enable_ai_image and not apikey:
logger.error('启用了AI配图但未填写API密钥')
except Exception as e:
logger.error(f"处理配图失败: {str(e)}")
async def _send_feed(self, story: str) -> bool:
"""发送说说到QQ空间"""
try:
# 获取配置
port = str(self.get_config("plugin.http_port", "3000"))
host = str(self.get_config("plugin.http_host", "127.0.0.1"))
qq_account = config_api.get_global_config("bot.qq_account", "")
enable_image = bool(self.get_config("send.enable_image", False))
image_dir = str(self.get_config("send.image_directory", "./plugins/Maizone/images"))
# 创建QZone管理器并发送
qzone_manager = QZoneManager(port, host)
success = await qzone_manager.send_feed(story, image_dir, qq_account, enable_image)
return success
except Exception as e:
logger.error(f"发送说说失败: {str(e)}")
return False
# ===== 发送说说动作组件 =====
class SendFeedAction(BaseAction):
"""发送说说动作 - 当用户要求发说说时激活"""
action_name = "send_feed"
action_description = "发一条相应主题的说说"
activation_type = ActionActivationType.KEYWORD
mode_enable = ChatMode.ALL
activation_keywords = ["说说", "空间", "动态"]
keyword_case_sensitive = False
action_parameters = {
"topic": "要发送的说说主题",
"user_name": "要求你发说说的好友的qq名称",
}
action_require = [
"用户要求发说说时使用",
"当有人希望你更新qq空间时使用",
"当你认为适合发说说时使用",
]
associated_types = ["text"]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 获取配置加载器引用
self.config_loader = None
self._init_config_loader()
def _init_config_loader(self):
"""初始化配置加载器"""
try:
plugin_dir = os.path.dirname(__file__)
self.config_loader = MaiZoneConfigLoader(plugin_dir)
self.config_loader.load_config()
except Exception as e:
logger.error(f"初始化配置加载器失败: {e}")
def get_config(self, key: str, default=None):
"""获取配置值"""
if self.config_loader:
return self.config_loader.get_config(key, default)
return default
def check_permission(self, qq_account: str) -> bool:
"""检查用户权限"""
permission_list = self.get_config("send.permission", [])
permission_type = self.get_config("send.permission_type", "whitelist")
logger.info(f'权限检查: {permission_type}:{permission_list}')
if isinstance(permission_list, list):
if permission_type == 'whitelist':
return qq_account in permission_list
elif permission_type == 'blacklist':
return qq_account not in permission_list
logger.error('权限类型配置错误')
return False
async def execute(self) -> Tuple[bool, str]:
if self.plugin_dir is None:
self.plugin_dir = os.path.dirname(__file__)
"""执行发送说说动作"""
try:
# 获取用户信息
user_name = self.action_data.get("user_name", "")
person_id = person_api.get_person_id_by_name(user_name)
user_id = await person_api.get_person_value(person_id, "user_id")
# 权限检查
if not self.check_permission(user_id):
logger.info(f"用户 {user_id} 权限不足")
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.chat_stream,
action_data={"extra_info_block": f'{user_name}无权命令你发送说说,请用符合你人格特点的方式拒绝请求'}
)
if success and reply_set:
for reply_type, reply_content in reply_set:
if reply_type == "text":
await self.send_text(reply_content)
return False, "权限不足"
# 获取主题并生成内容
topic = self.action_data.get("topic", "")
story = await self._generate_story_content(topic)
if not story:
return False, "生成说说内容失败"
# 处理图片
await self._handle_images(story)
# 发送说说
success = await self._send_feed(story)
if success:
logger.info(f"成功发送说说: {story}")
# 生成回复
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.chat_stream,
action_data={"extra_info_block": f'你刚刚发了一条说说,内容为{story}'}
)
if success and reply_set:
for reply_type, reply_content in reply_set:
if reply_type == "text":
await self.send_text(reply_content)
return True, '发送成功'
else:
await self.send_text('我发了一条说说啦~')
return True, '发送成功但回复生成失败'
else:
return False, "发送说说失败"
except Exception as e:
logger.error(f"发送说说动作执行失败: {str(e)}")
return False, "动作执行失败"
async def _generate_story_content(self, topic: str) -> str:
"""生成说说内容"""
try:
# 获取模型配置
models = llm_api.get_available_models()
text_model = str(self.get_config("models.text_model", "replyer_1"))
model_config = models.get(text_model)
if not model_config:
return ""
# 获取机器人信息
bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人")
bot_expression = config_api.get_global_config("expression.expression_style", "内容积极向上")
qq_account = config_api.get_global_config("bot.qq_account", "")
# 构建提示词
prompt = f"""
你是{bot_personality},你想写一条主题是{topic}的说说发表在qq空间上
{bot_expression}
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字,
只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出
"""
# 添加历史记录
prompt += "\n以下是你以前发过的说说,写新说说时注意不要在相隔不长的时间发送相同主题的说说"
history_block = await get_send_history(qq_account)
if history_block:
prompt += history_block
# 生成内容
success, story, reasoning, model_name = await llm_api.generate_with_model(
prompt=prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if success:
return story
else:
return ""
except Exception as e:
logger.error(f"生成说说内容异常: {str(e)}")
return ""
async def _handle_images(self, story: str):
"""处理说说配图"""
try:
enable_ai_image = bool(self.get_config("send.enable_ai_image", False))
apikey = str(self.get_config("models.siliconflow_apikey", ""))
image_dir = str(self.get_config("send.image_directory", "./plugins/Maizone/images"))
image_num_raw = self.get_config("send.ai_image_number", 1)
image_num = int(image_num_raw if image_num_raw is not None else 1)
if enable_ai_image and apikey:
await generate_image_by_sf(
api_key=apikey,
story=story,
image_dir=image_dir,
batch_size=image_num
)
elif enable_ai_image and not apikey:
logger.error('启用了AI配图但未填写API密钥')
except Exception as e:
logger.error(f"处理配图失败: {str(e)}")
async def _send_feed(self, story: str) -> bool:
"""发送说说到QQ空间"""
try:
# 获取配置
port = str(self.get_config("plugin.http_port", "3000"))
host = str(self.get_config("plugin.http_host", "127.0.0.1"))
qq_account = config_api.get_global_config("bot.qq_account", "")
enable_image = bool(self.get_config("send.enable_image", False))
image_dir = str(self.get_config("send.image_directory", "./plugins/Maizone/images"))
# 创建QZone管理器并发送
qzone_manager = QZoneManager(port, host)
success = await qzone_manager.send_feed(story, image_dir, qq_account, enable_image)
return success
except Exception as e:
logger.error(f"发送说说失败: {str(e)}")
return False
# ===== 阅读说说动作组件 =====
class ReadFeedAction(BaseAction):
"""阅读说说动作 - 当用户要求读说说时激活"""
action_name = "read_feed"
action_description = "读取好友最近的动态/说说/qq空间并评论点赞"
activation_type = ActionActivationType.KEYWORD
mode_enable = ChatMode.ALL
activation_keywords = ["说说", "空间", "动态"]
keyword_case_sensitive = False
action_parameters = {
"target_name": "需要阅读动态的好友的qq名称",
"user_name": "要求你阅读动态的好友的qq名称"
}
action_require = [
"需要阅读某人动态、说说、QQ空间时使用",
"当有人希望你评价某人的动态、说说、QQ空间",
"当你认为适合阅读说说、动态、QQ空间时使用",
]
associated_types = ["text"]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 获取配置加载器引用
self.config_loader = None
self._init_config_loader()
def _init_config_loader(self):
"""初始化配置加载器"""
try:
plugin_dir = os.path.dirname(__file__)
self.config_loader = MaiZoneConfigLoader(plugin_dir)
self.config_loader.load_config()
except Exception as e:
logger.error(f"初始化配置加载器失败: {e}")
def get_config(self, key: str, default=None):
"""获取配置值"""
if self.config_loader:
return self.config_loader.get_config(key, default)
return default
def check_permission(self, qq_account: str) -> bool:
"""检查用户权限"""
permission_list = self.get_config("read.permission", [])
permission_type = self.get_config("read.permission_type", "blacklist")
if not isinstance(permission_list, list):
return False
logger.info(f'权限检查: {permission_type}:{permission_list}')
if permission_type == 'whitelist':
return qq_account in permission_list
elif permission_type == 'blacklist':
return qq_account not in permission_list
else:
logger.error('权限类型配置错误')
return False
async def execute(self) -> Tuple[bool, str]:
"""执行阅读说说动作"""
try:
# 获取用户信息
user_name = self.action_data.get("user_name", "")
person_id = person_api.get_person_id_by_name(user_name)
user_id = await person_api.get_person_value(person_id, "user_id")
# 权限检查
if not self.check_permission(user_id):
logger.info(f"用户 {user_id} 权限不足")
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.chat_stream,
action_data={"extra_info_block": f'{user_name}无权命令你阅读说说,请用符合人格的方式进行拒绝的回复'}
)
if success and reply_set:
for reply_type, reply_content in reply_set:
if reply_type == "text":
await self.send_text(reply_content)
return False, "权限不足"
# 获取目标用户
target_name = self.action_data.get("target_name", "")
target_person_id = person_api.get_person_id_by_name(target_name)
target_qq = await person_api.get_person_value(target_person_id, "user_id")
# 读取并处理说说
success = await self._read_and_process_feeds(target_qq, target_name)
if success:
# 生成回复
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.chat_stream,
action_data={"extra_info_block": f'你刚刚成功读了{target_name}的说说,请告知你已经读了说说'}
)
if success and reply_set:
for reply_type, reply_content in reply_set:
if reply_type == "text":
await self.send_text(reply_content)
return True, '阅读成功'
return True, '阅读成功但回复生成失败'
else:
return False, "阅读说说失败"
except Exception as e:
logger.error(f"阅读说说动作执行失败: {str(e)}")
return False, "动作执行失败"
async def _read_and_process_feeds(self, target_qq: str, target_name: str) -> bool:
"""读取并处理说说"""
try:
# 获取配置
port = str(self.get_config("plugin.http_port", "3000"))
host = str(self.get_config("plugin.http_host", "127.0.0.1"))
qq_account = config_api.get_global_config("bot.qq_account", "")
num_raw = self.get_config("read.read_number", 5)
num = int(num_raw if num_raw is not None else 5)
like_raw = self.get_config("read.like_possibility", 1.0)
like_possibility = float(like_raw if like_raw is not None else 1.0)
comment_raw = self.get_config("read.comment_possibility", 1.0)
comment_possibility = float(comment_raw if comment_raw is not None else 1.0)
# 创建QZone管理器并读取说说
qzone_manager = QZoneManager(port, host)
feeds_list = await qzone_manager.read_feed(qq_account, target_qq, num)
# 处理错误情况
if isinstance(feeds_list, list) and len(feeds_list) > 0 and isinstance(feeds_list[0], dict) and 'error' in feeds_list[0]:
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.chat_stream,
action_data={"extra_info_block": f'你在读取说说的时候出现了错误,错误原因:{feeds_list[0].get("error")}'}
)
if success and reply_set:
for reply_type, reply_content in reply_set:
if reply_type == "text":
await self.send_text(reply_content)
return True
# 处理说说列表
if isinstance(feeds_list, list):
logger.info(f"成功读取到{len(feeds_list)}条说说")
for feed in feeds_list:
# 随机延迟
time.sleep(3 + random.random())
# 处理说说内容
await self._process_single_feed(
feed, target_qq, target_name,
like_possibility, comment_possibility, qzone_manager
)
return True
else:
return False
except Exception as e:
logger.error(f"读取并处理说说失败: {str(e)}")
return False
async def _process_single_feed(self, feed: dict, target_qq: str, target_name: str,
like_possibility: float, comment_possibility: float,
qzone_manager):
"""处理单条说说"""
try:
content = feed.get("content", "")
images = feed.get("images", [])
if images:
for image in images:
content = content + str(image)
fid = feed.get("tid", "")
rt_con = feed.get("rt_con", "")
# 随机评论
if random.random() <= comment_possibility:
comment = await self._generate_comment(content, rt_con, target_name)
if comment:
success = await qzone_manager.comment_feed(
config_api.get_global_config("bot.qq_account", ""),
target_qq, fid, comment
)
if success:
logger.info(f"发送评论'{comment}'成功")
else:
logger.error(f"评论说说'{content[:20]}...'失败")
# 随机点赞
if random.random() <= like_possibility:
success = await qzone_manager.like_feed(
config_api.get_global_config("bot.qq_account", ""),
target_qq, fid
)
if success:
logger.info(f"点赞说说'{content[:10]}..'成功")
else:
logger.error(f"点赞说说'{content[:20]}...'失败")
except Exception as e:
logger.error(f"处理单条说说失败: {str(e)}")
async def _generate_comment(self, content: str, rt_con: str, target_name: str) -> str:
"""生成评论内容"""
try:
# 获取模型配置
models = llm_api.get_available_models()
text_model = str(self.get_config("models.text_model", "replyer_1"))
model_config = models.get(text_model)
if not model_config:
return ""
# 获取机器人信息
bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人")
bot_expression = config_api.get_global_config("expression.expression_style", "内容积极向上")
# 构建提示词
if not rt_con:
prompt = f"""
你是'{bot_personality}',你正在浏览你好友'{target_name}'的QQ空间
你看到了你的好友'{target_name}'qq空间上内容是'{content}'的说说,你想要发表你的一条评论,
{bot_expression},回复的平淡一些,简短一些,说中文,
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,不要输出多余内容(包括前后缀,冒号和引号,括号()表情包at或 @等 )。只输出回复内容
"""
else:
prompt = f"""
你是'{bot_personality}',你正在浏览你好友'{target_name}'的QQ空间
你看到了你的好友'{target_name}'在qq空间上转发了一条内容为'{rt_con}'的说说,你的好友的评论为'{content}'
你想要发表你的一条评论,{bot_expression},回复的平淡一些,简短一些,说中文,
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,不要输出多余内容(包括前后缀,冒号和引号,括号()表情包at或 @等 )。只输出回复内容
"""
logger.info(f"正在评论'{target_name}'的说说:{content[:20]}...")
# 生成评论
success, comment, reasoning, model_name = await llm_api.generate_with_model(
prompt=prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if success:
logger.info(f"成功生成评论内容:'{comment}'")
return comment
else:
logger.error("生成评论内容失败")
return ""
except Exception as e:
logger.error(f"生成评论内容异常: {str(e)}")
return ""
# ===== 插件主类 =====
@register_plugin
class MaiZonePlugin(BasePlugin):
"""MaiZone插件 - 让麦麦发QQ空间"""
# 插件基本信息
plugin_name: str = "MaiZonePlugin"
enable_plugin: bool = True
dependencies: List[str] = []
python_dependencies: List[str] = []
config_file_name: str = "config.toml"
# 配置节描述
config_section_descriptions = {
"plugin": "插件基础配置",
"models": "模型相关配置",
"send": "发送说说配置",
"read": "阅读说说配置",
"monitor": "自动监控配置",
"schedule": "定时发送配置",
}
# 配置模式定义
config_schema: dict = {
"plugin": {
"enable": ConfigField(type=bool, default=True, description="是否启用插件"),
"config_version": ConfigField(type=str, default="2.0.0", description="配置文件版本"),
"http_port": ConfigField(type=str, default='3000', description="NapCat HTTP服务器端口号"),
"http_host": ConfigField(type=str, default='127.0.0.1', description="NapCat HTTP服务器地址"),
},
"models": {
"text_model": ConfigField(type=str, default="replyer_1", description="生成文本的模型名称"),
"siliconflow_apikey": ConfigField(type=str, default="", description="硅基流动AI生图API密钥"),
},
"send": {
"permission": ConfigField(type=list, default=['2488036428'], description="发送权限QQ号列表"),
"permission_type": ConfigField(type=str, default='whitelist', description="权限类型whitelist(白名单) 或 blacklist(黑名单)"),
"enable_image": ConfigField(type=bool, default=False, description="是否启用说说配图"),
"enable_ai_image": ConfigField(type=bool, default=False, description="是否启用AI生成配图"),
"enable_reply": ConfigField(type=bool, default=True, description="生成完成时是否发出回复"),
"ai_image_number": ConfigField(type=int, default=1, description="AI生成图片数量(1-4张)"),
"image_directory": ConfigField(type=str, default="./plugins/built_in/Maizone/images", description="图片存储目录")
},
"read": {
"permission": ConfigField(type=list, default=[], description="阅读权限QQ号列表"),
"permission_type": ConfigField(type=str, default='blacklist', description="权限类型whitelist(白名单) 或 blacklist(黑名单)"),
"read_number": ConfigField(type=int, default=5, description="一次读取的说说数量"),
"like_possibility": ConfigField(type=float, default=1.0, description="点赞概率(0.0-1.0)"),
"comment_possibility": ConfigField(type=float, default=0.3, description="评论概率(0.0-1.0)"),
},
"monitor": {
"enable_auto_monitor": ConfigField(type=bool, default=False, description="是否启用自动监控好友说说"),
"interval_minutes": ConfigField(type=int, default=10, description="监控间隔时间(分钟)"),
},
"schedule": {
"enable_schedule": ConfigField(type=bool, default=False, description="是否启用定时发送说说"),
"schedules": ConfigField(
type=list,
default=[
{"time": "08:00", "topic": "早安"},
{"time": "22:00", "topic": "晚安"}
],
description="定时发送任务列表"
),
},
}
def __init__(self, *args, **kwargs):
"""初始化插件"""
super().__init__(*args, **kwargs)
# 设置插件信息
self.plugin_name = "MaiZone"
self.plugin_description = "让麦麦实现QQ空间点赞、评论、发说说功能"
self.plugin_version = "2.0.0"
self.plugin_author = "重构版"
self.config_file_name = "config.toml"
# 初始化独立配置加载器
plugin_dir = self.plugin_dir
if plugin_dir is None:
plugin_dir = os.path.dirname(__file__)
self.config_loader = MaiZoneConfigLoader(plugin_dir, self.config_file_name)
# 加载配置
if not self.config_loader.load_config():
logger.error("配置加载失败,使用默认设置")
# 获取启用状态
self.enable_plugin = self.config_loader.get_config("plugin.enable", True)
# 初始化管理器
self.monitor_manager = None
self.schedule_manager = None
# 根据配置启动功能
if self.enable_plugin:
self._init_managers()
def _init_managers(self):
"""初始化管理器"""
try:
# 初始化监控管理器
if self.config_loader.get_config("monitor.enable_auto_monitor", False):
from .monitor import MonitorManager
self.monitor_manager = MonitorManager(self)
asyncio.create_task(self._start_monitor_delayed())
# 初始化定时管理器
if self.config_loader.get_config("schedule.enable_schedule", False):
logger.info(f"定时任务启用状态: true")
self.schedule_manager = ScheduleManager(self)
asyncio.create_task(self._start_scheduler_delayed())
except Exception as e:
logger.error(f"初始化管理器失败: {str(e)}")
async def _start_monitor_delayed(self):
"""延迟启动监控管理器"""
try:
await asyncio.sleep(10) # 等待插件完全初始化
if self.monitor_manager:
await self.monitor_manager.start()
except Exception as e:
logger.error(f"启动监控管理器失败: {str(e)}")
async def _start_scheduler_delayed(self):
"""延迟启动定时管理器"""
try:
await asyncio.sleep(10) # 等待插件完全初始化
if self.schedule_manager:
await self.schedule_manager.start()
except Exception as e:
logger.error(f"启动定时管理器失败: {str(e)}")
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
"""获取插件组件列表"""
return [
(SendFeedAction.get_action_info(), SendFeedAction),
(ReadFeedAction.get_action_info(), ReadFeedAction),
(SendFeedCommand.get_command_info(), SendFeedCommand)
]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,362 @@
import asyncio
import datetime
import time
import traceback
import os
import toml
from typing import Dict, List, Any
from src.common.logger import get_logger
from src.plugin_system.apis import llm_api, config_api
# 导入工具模块
import sys
import os
sys.path.append(os.path.dirname(__file__))
from qzone_utils import QZoneManager, get_send_history
# 获取日志记录器
logger = get_logger('MaiZone-Scheduler')
class ScheduleManager:
"""定时任务管理器 - 负责定时发送说说"""
def __init__(self, plugin):
"""初始化定时任务管理器"""
self.plugin = plugin
self.is_running = False
self.task = None
self.last_send_times: Dict[str, float] = {} # 记录每个时间点的最后发送时间
self.config_file_path = os.path.join(os.path.dirname(__file__), "config.toml")
logger.info("定时任务管理器初始化完成")
# 初始化时测试配置读取
def _read_schedule_config(self) -> List[Dict[str, str]]:
"""直接从TOML配置文件读取日程配置"""
try:
if not os.path.exists(self.config_file_path):
logger.error(f"配置文件不存在: {self.config_file_path}")
return []
# 读取TOML文件
with open(self.config_file_path, 'r', encoding='utf-8') as f:
config_data = toml.load(f)
# 获取schedule配置
schedule_config = config_data.get('schedule', {})
schedules = schedule_config.get('schedules', [])
logger.info(f"从配置文件读取到 {len(schedules)} 个定时任务")
# 验证每个日程的格式
valid_schedules = []
for i, schedule in enumerate(schedules):
if isinstance(schedule, dict) and 'time' in schedule and 'topic' in schedule:
valid_schedules.append({
'time': str(schedule['time']),
'topic': str(schedule['topic'])
})
logger.debug(f"日程 {i+1}: {schedule['time']} - {schedule['topic']}")
else:
logger.warning(f"跳过无效的日程配置: {schedule}")
return valid_schedules
except Exception as e:
logger.error(f"读取日程配置失败: {str(e)}")
return []
def _is_schedule_enabled(self) -> bool:
"""检查定时任务是否启用"""
try:
if not os.path.exists(self.config_file_path):
return False
with open(self.config_file_path, 'r', encoding='utf-8') as f:
config_data = toml.load(f)
schedule_config = config_data.get('schedule', {})
enabled = schedule_config.get('enable_schedule', False)
logger.debug(f"定时任务启用状态: {enabled}")
return bool(enabled)
except Exception as e:
logger.error(f"检查定时任务启用状态失败: {str(e)}")
return False
async def start(self):
"""启动定时任务"""
if self.is_running:
logger.warning("定时任务已在运行中")
return
self.is_running = True
self.task = asyncio.create_task(self._schedule_loop())
logger.info("定时发送说说任务已启动")
async def stop(self):
"""停止定时任务"""
if not self.is_running:
return
self.is_running = False
if self.task:
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
logger.info("定时任务已被取消")
logger.info("定时发送说说任务已停止")
async def _schedule_loop(self):
"""定时任务主循环"""
while self.is_running:
try:
# 检查定时任务是否启用
if not self._is_schedule_enabled():
logger.info("定时任务已禁用,等待下次检查")
await asyncio.sleep(60)
continue
# 获取当前时间
current_time = datetime.datetime.now().strftime("%H:%M")
# 直接从配置文件读取定时任务配置
schedules = self._read_schedule_config()
if not schedules:
logger.info("未找到有效的定时任务配置")
await asyncio.sleep(60)
continue
# 检查每个定时任务
for schedule in schedules:
await self._check_and_execute_schedule(schedule, current_time)
# 每分钟检查一次
await asyncio.sleep(60)
except asyncio.CancelledError:
logger.info("定时任务循环被取消")
break
except Exception as e:
logger.error(f"定时任务循环出错: {str(e)}")
logger.error(traceback.format_exc())
# 出错后等待5分钟再重试
await asyncio.sleep(300)
async def _check_and_execute_schedule(self, schedule: Dict[str, Any], current_time: str):
"""检查并执行定时任务"""
try:
schedule_time = schedule.get("time", "")
topic = schedule.get("topic", "")
# 检查是否到达发送时间
if current_time == schedule_time:
# 避免同一分钟内重复发送
last_send_time = self.last_send_times.get(schedule_time, 0)
current_timestamp = time.time()
if current_timestamp - last_send_time > 60: # 超过1分钟才允许发送
logger.info(f"定时任务触发: {schedule_time} - 主题: {topic}")
self.last_send_times[schedule_time] = current_timestamp
# 执行发送任务
success = await self._execute_scheduled_send(topic)
if success:
logger.info(f"定时说说发送成功: {topic}")
else:
logger.error(f"定时说说发送失败: {topic}")
else:
logger.debug(f"跳过重复发送: {schedule_time}")
except Exception as e:
logger.error(f"检查定时任务失败: {str(e)}")
async def _execute_scheduled_send(self, topic: str) -> bool:
"""执行定时发送任务"""
try:
# 生成说说内容
story = await self._generate_story_content(topic)
if not story:
logger.error("生成定时说说内容失败")
return False
logger.info(f"定时任务生成说说内容: '{story}'")
# 处理配图
await self._handle_images(story)
# 发送说说
success = await self._send_scheduled_feed(story)
return success
except Exception as e:
logger.error(f"执行定时发送任务失败: {str(e)}")
return False
async def _generate_story_content(self, topic: str) -> str:
"""生成定时说说内容"""
try:
# 获取模型配置
models = llm_api.get_available_models()
text_model = str(self.plugin.get_config("models.text_model", "replyer_1"))
model_config = models.get(text_model)
if not model_config:
logger.error("未配置LLM模型")
return ""
# 获取机器人信息
bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人")
bot_expression = config_api.get_global_config("expression.expression_style", "内容积极向上")
qq_account = config_api.get_global_config("bot.qq_account", "")
# 构建提示词
if topic:
prompt = f"""
你是'{bot_personality}',你想写一条主题是'{topic}'的说说发表在qq空间上
{bot_expression}
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字,
只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出
"""
else:
prompt = f"""
你是'{bot_personality}'你想写一条说说发表在qq空间上主题不限
{bot_expression}
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字,
只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出
"""
# 添加历史记录避免重复
prompt += "\n以下是你最近发过的说说,写新说说时注意不要在相隔不长的时间发送相似内容的说说\n"
history_block = await get_send_history(qq_account)
if history_block:
prompt += history_block
# 生成内容
success, story, reasoning, model_name = await llm_api.generate_with_model(
prompt=prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if success:
return story
else:
logger.error("生成定时说说内容失败")
return ""
except Exception as e:
logger.error(f"生成定时说说内容异常: {str(e)}")
return ""
async def _handle_images(self, story: str):
"""处理定时说说配图"""
try:
enable_ai_image = bool(self.plugin.get_config("send.enable_ai_image", False))
apikey = str(self.plugin.get_config("models.siliconflow_apikey", ""))
image_dir = str(self.plugin.get_config("send.image_directory", "./plugins/Maizone/images"))
image_num = int(self.plugin.get_config("send.ai_image_number", 1) or 1)
if enable_ai_image and apikey:
from qzone_utils import generate_image_by_sf
await generate_image_by_sf(
api_key=apikey,
story=story,
image_dir=image_dir,
batch_size=image_num
)
logger.info("定时任务AI配图生成完成")
elif enable_ai_image and not apikey:
logger.warning('启用了AI配图但未填写API密钥')
except Exception as e:
logger.error(f"处理定时说说配图失败: {str(e)}")
async def _send_scheduled_feed(self, story: str) -> bool:
"""发送定时说说"""
try:
# 获取配置
port = str(self.plugin.get_config("plugin.http_port", "3000"))
host = self.plugin.get_config("plugin.http_host", "127.0.0.1")
qq_account = config_api.get_global_config("bot.qq_account", "")
enable_image = self.plugin.get_config("send.enable_image", False)
image_dir = str(self.plugin.get_config("send.image_directory", "./plugins/Maizone/images"))
# 创建QZone管理器并发送
qzone_manager = QZoneManager(port, host)
success = await qzone_manager.send_feed(story, image_dir, qq_account, enable_image)
if success:
logger.info(f"定时说说发送成功: {story}")
else:
logger.error("定时说说发送失败")
return success
except Exception as e:
logger.error(f"发送定时说说失败: {str(e)}")
return False
def get_status(self) -> Dict[str, Any]:
"""获取定时任务状态"""
return {
"is_running": self.is_running,
"enabled": self._is_schedule_enabled(),
"schedules": self._read_schedule_config(),
"last_send_times": self.last_send_times
}
def add_schedule(self, time_str: str, topic: str) -> bool:
"""添加定时任务"""
try:
schedules = self.plugin.get_config("schedule.schedules", [])
new_schedule = {"time": time_str, "topic": topic}
# 检查是否已存在相同时间的任务
for schedule in schedules:
if isinstance(schedule, dict) and schedule.get("time") == time_str:
logger.warning(f"时间 {time_str} 已存在定时任务")
return False
schedules.append(new_schedule)
# 注意:这里需要插件系统支持动态更新配置
logger.info(f"添加定时任务: {time_str} - {topic}")
return True
except Exception as e:
logger.error(f"添加定时任务失败: {str(e)}")
return False
def remove_schedule(self, time_str: str) -> bool:
"""移除定时任务"""
try:
schedules = self.plugin.get_config("schedule.schedules", [])
original_count = len(schedules)
# 过滤掉指定时间的任务
schedules = [s for s in schedules if not (isinstance(s, dict) and s.get("time") == time_str)]
if len(schedules) < original_count:
# 注意:这里需要插件系统支持动态更新配置
logger.info(f"移除定时任务: {time_str}")
return True
else:
logger.warning(f"未找到时间为 {time_str} 的定时任务")
return False
except Exception as e:
logger.error(f"移除定时任务失败: {str(e)}")
return False

View File

@@ -0,0 +1,27 @@
{
"manifest_version": 1,
"name": "web_search_tool",
"version": "1.0.0",
"description": "一个用于在互联网上搜索信息的工具",
"author": {
"name": "MaiBot-Plus开发团队",
"url": "https://github.com/MaiBot-Plus"
},
"license": "GPL-v3.0-or-later",
"host_application": {
"min_version": "0.10.0"
},
"homepage_url": "https://github.com/MaiBot-Plus/mmc",
"repository_url": "https://github.com/MaiBot-Plus/mmc",
"keywords": ["web_search", "url_parser"],
"categories": ["web_search", "url_parser"],
"default_locale": "zh-CN",
"locales_path": "_locales",
"plugin_info": {
"is_built_in": false,
"plugin_type": "web_search"
}
}

View File

@@ -0,0 +1,372 @@
import os
import asyncio
import functools
from typing import Any, Dict, List
from datetime import datetime, timedelta
from exa_py import Exa
import asyncio
from asyncddgs import aDDGS
from src.common.logger import get_logger
from typing import Tuple,Type
from src.plugin_system import (
BasePlugin,
register_plugin,
BaseAction,
BaseCommand,
BaseTool,
ComponentInfo,
ActionActivationType,
ConfigField,
BaseEventHandler,
llm_api,
EventType,
MaiMessages,
ToolParamType
)
from src.config.config import global_config
import httpx
from bs4 import BeautifulSoup
logger = get_logger("web_surfing_tool")
class WebSurfingTool(BaseTool):
name: str = "web_search"
description: str = "当需要回答关于最新事件、实时信息或用户不清楚的特定主题时,使用此工具在互联网上搜索信息。"
parameters = [
("query", ToolParamType.STRING, "要搜索的关键词或问题。", True, None),
("num_results", ToolParamType.INTEGER, "期望每个搜索引擎返回的搜索结果数量默认为5。", False, "5"),
("time_range", ToolParamType.STRING, "指定搜索的时间范围,可以是 'any', 'week', 'month'。默认为 'any'", False, "any")
] # type: ignore
def __init__(self):
EXA_API_KEY = self.get_config("exa.api_key", None)
super().__init__()
self.exa = Exa(api_key=EXA_API_KEY) if EXA_API_KEY else None
if not self.exa:
logger.warning("Exa API Key 未配置Exa 搜索功能将不可用。")
async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]:
query = function_args.get("query")
if not query:
return {"error": "搜索查询不能为空。"}
logger.info(f"开始并行搜索,参数: '{function_args}'")
search_tasks = []
if self.exa:
search_tasks.append(self._search_exa(function_args))
search_tasks.append(self._search_ddg(function_args))
try:
search_results_lists = await asyncio.gather(*search_tasks, return_exceptions=True)
all_results = []
for result in search_results_lists:
if isinstance(result, list):
all_results.extend(result)
elif isinstance(result, Exception):
logger.error(f"搜索时发生错误: {result}")
# 去重并格式化
unique_results = self._deduplicate_results(all_results)
formatted_content = self._format_results(unique_results)
result_package = {
"type": "web_search_result",
"content": formatted_content,
}
return result_package
except Exception as e:
logger.error(f"执行并行网络搜索时发生异常: {e}", exc_info=True)
return {"error": f"执行网络搜索时发生严重错误: {str(e)}"}
def _deduplicate_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
unique_urls = set()
unique_results = []
for res in results:
if isinstance(res, dict) and res.get("url") and res["url"] not in unique_urls:
unique_urls.add(res["url"])
unique_results.append(res)
return unique_results
async def _search_exa(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
query = args["query"]
num_results = args.get("num_results", 3)
time_range = args.get("time_range", "any")
exa_args = {"num_results": num_results, "text": True, "highlights": True}
if time_range != "any":
today = datetime.now()
start_date = today - timedelta(days=7 if time_range == "week" else 30)
exa_args["start_published_date"] = start_date.strftime('%Y-%m-%d')
try:
if not self.exa:
return []
loop = asyncio.get_running_loop()
func = functools.partial(self.exa.search_and_contents, query, **exa_args)
search_response = await loop.run_in_executor(None, func)
return [
{
"title": res.title,
"url": res.url,
"snippet": " ".join(getattr(res, 'highlights', [])) or (getattr(res, 'text', '')[:250] + '...'),
"provider": "Exa"
}
for res in search_response.results
]
except Exception as e:
logger.error(f"Exa 搜索失败: {e}")
return []
async def _search_ddg(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
query = args["query"]
num_results = args.get("num_results", 3)
try:
async with aDDGS() as ddgs:
search_response = await ddgs.text(query, max_results=num_results)
return [
{
"title": r.get("title"),
"url": r.get("href"),
"snippet": r.get("body"),
"provider": "DuckDuckGo"
}
for r in search_response
]
except Exception as e:
logger.error(f"DuckDuckGo 搜索失败: {e}")
return []
def _format_results(self, results: List[Dict[str, Any]]) -> str:
if not results:
return "没有找到相关的网络信息。"
formatted_string = "根据网络搜索结果:\n\n"
for i, res in enumerate(results, 1):
title = res.get("title", '无标题')
url = res.get("url", '#')
snippet = res.get("snippet", '无摘要')
provider = res.get("provider", "未知来源")
formatted_string += f"{i}. **{title}** (来自: {provider})\n"
formatted_string += f" - 摘要: {snippet}\n"
formatted_string += f" - 来源: {url}\n\n"
return formatted_string
class URLParserTool(BaseTool):
"""
一个用于解析和总结一个或多个网页URL内容的工具。
"""
name: str = "parse_url"
description: str = "当需要理解一个或多个特定网页链接的内容时,使用此工具。例如:'这些网页讲了什么?[https://example.com, https://example2.com]''帮我总结一下这些文章'"
parameters = [
("urls", ToolParamType.STRING, "要理解的网站", True, None),
]
def __init__(self):
super().__init__()
EXA_API_KEY = self.get_config("exa.api_key", None)
if not EXA_API_KEY or EXA_API_KEY == "YOUR_API_KEY_HERE":
self.exa = None
logger.error("Exa API Key 未配置URL解析功能将受限。")
else:
self.exa = Exa(api_key=EXA_API_KEY)
async def _local_parse_and_summarize(self, url: str) -> Dict[str, Any]:
"""
使用本地库(httpx, BeautifulSoup)解析URL并调用LLM进行总结。
"""
try:
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
response = await client.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
title = soup.title.string if soup.title else "无标题"
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text(separator="\n", strip=True)
if not text:
return {"error": "无法从页面提取有效文本内容。"}
summary_prompt = f"请根据以下网页内容生成一段不超过300字的中文摘要保留核心信息和关键点:\n\n---\n\n标题: {title}\n\n内容:\n{text[:4000]}\n\n---\n\n摘要:"
text_model = str(self.get_config("models.text_model", "replyer_1"))
models = llm_api.get_available_models()
model_config = models.get(text_model)
if not model_config:
logger.error("未配置LLM模型")
return {"error": "未配置LLM模型"}
success, summary, reasoning, model_name = await llm_api.generate_with_model(
prompt=summary_prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if not success:
logger.info(f"生成摘要失败: {summary}")
return {"error": "发生ai错误"}
logger.info(f"成功生成摘要内容:'{summary}'")
return {
"title": title,
"url": url,
"snippet": summary,
"source": "local"
}
except httpx.HTTPStatusError as e:
logger.warning(f"本地解析URL '{url}' 失败 (HTTP {e.response.status_code})")
return {"error": f"请求失败,状态码: {e.response.status_code}"}
except Exception as e:
logger.error(f"本地解析或总结URL '{url}' 时发生未知异常: {e}", exc_info=True)
return {"error": f"发生未知错误: {str(e)}"}
async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]:
"""
执行URL内容提取和总结。优先使用Exa失败后尝试本地解析。
"""
urls = function_args.get("urls")
if not urls:
return {"error": "URL列表不能为空。"}
successful_results = []
error_messages = []
urls_to_retry_locally = []
# 步骤 1: 尝试使用 Exa API 进行解析
contents_response = None
if self.exa:
logger.info(f"开始使用 Exa API 解析URL: {urls}")
try:
loop = asyncio.get_running_loop()
exa_params = {"text": True, "summary": True, "highlights": True}
func = functools.partial(self.exa.get_contents, urls, **exa_params)
contents_response = await loop.run_in_executor(None, func)
except Exception as e:
logger.error(f"执行 Exa URL解析时发生严重异常: {e}", exc_info=True)
contents_response = None # 确保异常后为None
# 步骤 2: 处理Exa的响应
if contents_response and hasattr(contents_response, 'statuses'):
results_map = {res.url: res for res in contents_response.results} if hasattr(contents_response, 'results') else {}
if contents_response.statuses:
for status in contents_response.statuses:
if status.status == 'success':
res = results_map.get(status.id)
if res:
summary = getattr(res, 'summary', '')
highlights = " ".join(getattr(res, 'highlights', []))
text_snippet = (getattr(res, 'text', '')[:300] + '...') if getattr(res, 'text', '') else ''
snippet = summary or highlights or text_snippet or '无摘要'
successful_results.append({
"title": getattr(res, 'title', '无标题'),
"url": getattr(res, 'url', status.id),
"snippet": snippet,
"source": "exa"
})
else:
error_tag = getattr(status, 'error', '未知错误')
logger.warning(f"Exa解析URL '{status.id}' 失败: {error_tag}。准备本地重试。")
urls_to_retry_locally.append(status.id)
else:
# 如果Exa未配置、API调用失败或返回无效响应则所有URL都进入本地重试
urls_to_retry_locally.extend(url for url in urls if url not in [res['url'] for res in successful_results])
# 步骤 3: 对失败的URL进行本地解析
if urls_to_retry_locally:
logger.info(f"开始本地解析以下URL: {urls_to_retry_locally}")
local_tasks = [self._local_parse_and_summarize(url) for url in urls_to_retry_locally]
local_results = await asyncio.gather(*local_tasks)
for i, res in enumerate(local_results):
url = urls_to_retry_locally[i]
if "error" in res:
error_messages.append(f"URL: {url} - 解析失败: {res['error']}")
else:
successful_results.append(res)
if not successful_results:
return {"error": "无法从所有给定的URL获取内容。", "details": error_messages}
formatted_content = self._format_results(successful_results)
result = {
"type": "url_parse_result",
"content": formatted_content,
"errors": error_messages
}
return result
def _format_results(self, results: List[Dict[str, Any]]) -> str:
"""
将成功解析的结果列表格式化为一段简洁的文本。
"""
formatted_parts = []
for res in results:
title = res.get('title', '无标题')
url = res.get('url', '#')
snippet = res.get('snippet', '无摘要')
source = res.get('source', '未知')
formatted_string = f"**{title}**\n"
formatted_string += f"**内容摘要**:\n{snippet}\n"
formatted_string += f"**来源**: {url} (由 {source} 解析)\n"
formatted_parts.append(formatted_string)
return "\n---\n".join(formatted_parts)
@register_plugin
class WEBSEARCHPLUGIN(BasePlugin):
# 插件基本信息
plugin_name: str = "hello_world_plugin" # 内部标识符
enable_plugin: bool = True
dependencies: List[str] = [] # 插件依赖列表
python_dependencies: List[str] = ["asyncddgs","exa_py"] # Python包依赖列表
config_file_name: str = "config.toml" # 配置文件名
# 配置节描述
config_section_descriptions = {"plugin": "插件基本信息", "exa": "EXA相关配置", "components": "组件设置"}
# 配置Schema定义
config_schema: dict = {
"plugin": {
"name": ConfigField(type=str, default="WEB_SEARCH_PLUGIN", description="插件名称"),
"version": ConfigField(type=str, default="1.0.0", description="插件版本"),
"enabled": ConfigField(type=bool, default=False, description="是否启用插件"),
},
"exa":{
"api_key":ConfigField(type=str, default="None", description="exa的API密钥")
},
"components":{
"enable_web_search_tool":ConfigField(type=bool, default=True, description="是否启用联网搜索tool"),
"enable_url_tool":ConfigField(type=bool, default=True, description="是否启用URL解析tool")
}
}
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
enable_tool =[]
if self.get_config("components.enable_web_search_tool"):
enable_tool.append((WebSurfingTool.get_tool_info(), WebSurfingTool))
if self.get_config("components.enable_url_tool"):
enable_tool.append((URLParserTool.get_tool_info(), URLParserTool))
return enable_tool