更新MaiZone插件配置:将定时发送任务列表格式从列表改为字典,优化配置读取方式(可算***修好了)

This commit is contained in:
minecraft1024a
2025-08-12 12:51:46 +08:00
parent 0ed19263da
commit f278c09486
5 changed files with 67 additions and 133 deletions

View File

@@ -164,10 +164,9 @@ class MaiZoneConfigLoader:
# 定时发送配置节
schedule_section = ConfigSectionSpec("schedule", "定时发送配置")
schedule_section.add_field(ConfigFieldSpec("enable_schedule", bool, False, "是否启用定时发送说说"))
schedule_section.add_field(ConfigFieldSpec("schedules", list, [
{"time": "08:00", "topic": ""},
{"time": "22:00", "topic": "晚安"}
], "定时发送任务列表"))
schedule_section.add_field(ConfigFieldSpec("schedules", dict,
{"08:00": "早安", "22:00": ""},
"定时发送任务列表, 格式为 {\"时间\": \"主题\"}"))
self.config_specs["schedule"] = schedule_section
def load_config(self) -> bool:
@@ -270,10 +269,13 @@ class MaiZoneConfigLoader:
formatted_list = "[" + ", ".join(f'"{item}"' for item in value) + "]"
elif all(isinstance(item, dict) for item in value):
# 处理字典列表如schedules
# 使用 TOML 内联表格式
formatted_items = []
for item in value:
formatted_items.append("{" + ", ".join(f'{k} = "{v}"' for k, v in item.items()) + "}")
formatted_list = "[\n " + ",\n ".join(formatted_items) + ",\n]"
# TOML 内联表中的字符串需要转义
item_str = ", ".join([f'{k} = "{str(v)}"' for k, v in item.items()])
formatted_items.append(f"{{ {item_str} }}")
formatted_list = "[\n " + ",\n ".join(formatted_items) + "\n]"
else:
formatted_list = str(value)
toml_content += f"{field_name} = {formatted_list}\n"

View File

@@ -720,7 +720,7 @@ class MaiZonePlugin(BasePlugin):
"siliconflow_apikey": ConfigField(type=str, default="", description="硅基流动AI生图API密钥"),
},
"send": {
"permission": ConfigField(type=list, default=['2488036428'], description="发送权限QQ号列表"),
"permission": ConfigField(type=list, default=['1145141919810'], description="发送权限QQ号列表"),
"permission_type": ConfigField(type=str, default='whitelist', description="权限类型whitelist(白名单) 或 blacklist(黑名单)"),
"enable_image": ConfigField(type=bool, default=False, description="是否启用说说配图"),
"enable_ai_image": ConfigField(type=bool, default=False, description="是否启用AI生成配图"),
@@ -742,12 +742,9 @@ class MaiZonePlugin(BasePlugin):
"schedule": {
"enable_schedule": ConfigField(type=bool, default=False, description="是否启用定时发送说说"),
"schedules": ConfigField(
type=list,
default=[
{"time": "08:00", "topic": "早安"},
{"time": "22:00", "topic": "晚安"}
],
description="定时发送任务列表"
type=str,
default=r"""{"08:00" = "早安","22:00" = "晚安"}""",
description="定时发送任务列表, 格式为 {\"时间\"= \"主题\"}"
),
},
}

View File

@@ -28,7 +28,9 @@ class CookieManager:
@staticmethod
def get_cookie_file_path(uin: str) -> str:
"""获取Cookie文件路径"""
return os.path.join(os.getcwd(), 'plugins/Maizone/', f"cookies-{uin}.json")
# .parents 向上追溯四级目录,到达 mmc 根目录
base_path = Path(__file__).resolve().parents[5]
return str(base_path / 'src' / 'plugins'/ 'built_in' / 'Maizone' / f"cookies-{uin}.json")
@staticmethod
def parse_cookie_string(cookie_str: str) -> Dict[str, str]:
@@ -82,7 +84,7 @@ class CookieManager:
uin = CookieManager.extract_uin_from_cookie(cookie_str)
file_path = CookieManager.get_cookie_file_path(uin)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(parsed_cookies, f, indent=4, ensure_ascii=False)
@@ -153,9 +155,9 @@ class QZoneAPI:
self,
method: str,
url: str,
params: Dict = None,
data: Dict = None,
headers: Dict = None,
params: Optional[Dict] = None,
data: Optional[Dict] = None,
headers: Optional[Dict] = None,
timeout: int = 10
) -> requests.Response:
"""执行HTTP请求"""
@@ -271,7 +273,7 @@ class QZoneAPI:
logger.error(f"提取图片信息失败: {str(e)}")
raise
async def publish_emotion(self, content: str, images: List[bytes] = None) -> str:
async def publish_emotion(self, content: str, images: Optional[List[bytes]] = None) -> str:
"""发布说说"""
if images is None:
images = []
@@ -597,7 +599,10 @@ class QZoneAPI:
try:
json_data = json5.loads(data)
feeds_data = json_data['data']['data']
if json_data and isinstance(json_data, dict):
feeds_data = json_data.get('data', {}).get('data', [])
else:
feeds_data = []
except Exception as e:
logger.error(f"解析JSON数据失败: {str(e)}")
return []
@@ -659,8 +664,8 @@ class QZoneAPI:
like_btn = soup.find('a', class_='qz_like_btn_v3')
if not like_btn:
like_btn = soup.find('a', attrs={'data-islike': True})
if like_btn:
if isinstance(like_btn, bs4.element.Tag):
data_islike = like_btn.get('data-islike')
if data_islike == '1': # 已点赞,跳过
return None
@@ -680,10 +685,10 @@ class QZoneAPI:
# 提取图片
images = []
img_box = soup.find('div', class_='img-box')
if img_box:
if isinstance(img_box, bs4.element.Tag):
for img in img_box.find_all('img'):
src = img.get('src')
if src and not src.startswith('http://qzonestyle.gtimg.cn'):
src = img.get('src') if isinstance(img, bs4.element.Tag) else None
if src and isinstance(src, str) and not src.startswith('http://qzonestyle.gtimg.cn'):
try:
image_base64 = await self._get_image_base64_by_url(src)
image_manager = get_image_manager()
@@ -694,14 +699,16 @@ class QZoneAPI:
# 视频缩略图
img_tag = soup.select_one('div.video-img img')
if img_tag and 'src' in img_tag.attrs:
try:
image_base64 = await self._get_image_base64_by_url(img_tag['src'])
image_manager = get_image_manager()
description = await image_manager.get_image_description(image_base64)
images.append(f"视频缩略图: {description}")
except Exception as e:
logger.warning(f"处理视频缩略图失败: {str(e)}")
if isinstance(img_tag, bs4.element.Tag):
src = img_tag.get('src')
if src and isinstance(src, str):
try:
image_base64 = await self._get_image_base64_by_url(src)
image_manager = get_image_manager()
description = await image_manager.get_image_description(image_base64)
images.append(f"视频缩略图: {description}")
except Exception as e:
logger.warning(f"处理视频缩略图失败: {str(e)}")
# 视频URL
videos = []

View File

@@ -3,7 +3,7 @@ import datetime
import time
import traceback
import os
import toml
import json
from typing import Dict, List, Any
from src.common.logger import get_logger
@@ -29,65 +29,8 @@ class ScheduleManager:
self.is_running = False
self.task = None
self.last_send_times: Dict[str, float] = {} # 记录每个时间点的最后发送时间
self.config_file_path = os.path.join(os.path.dirname(__file__), "config.toml")
logger.info("定时任务管理器初始化完成")
# 初始化时测试配置读取
def _read_schedule_config(self) -> List[Dict[str, str]]:
"""直接从TOML配置文件读取日程配置"""
try:
if not os.path.exists(self.config_file_path):
logger.error(f"配置文件不存在: {self.config_file_path}")
return []
# 读取TOML文件
with open(self.config_file_path, 'r', encoding='utf-8') as f:
config_data = toml.load(f)
# 获取schedule配置
schedule_config = config_data.get('schedule', {})
schedules = schedule_config.get('schedules', [])
logger.info(f"从配置文件读取到 {len(schedules)} 个定时任务")
# 验证每个日程的格式
valid_schedules = []
for i, schedule in enumerate(schedules):
if isinstance(schedule, dict) and 'time' in schedule and 'topic' in schedule:
valid_schedules.append({
'time': str(schedule['time']),
'topic': str(schedule['topic'])
})
logger.debug(f"日程 {i+1}: {schedule['time']} - {schedule['topic']}")
else:
logger.warning(f"跳过无效的日程配置: {schedule}")
return valid_schedules
except Exception as e:
logger.error(f"读取日程配置失败: {str(e)}")
return []
def _is_schedule_enabled(self) -> bool:
"""检查定时任务是否启用"""
try:
if not os.path.exists(self.config_file_path):
return False
with open(self.config_file_path, 'r', encoding='utf-8') as f:
config_data = toml.load(f)
schedule_config = config_data.get('schedule', {})
enabled = schedule_config.get('enable_schedule', False)
logger.debug(f"定时任务启用状态: {enabled}")
return bool(enabled)
except Exception as e:
logger.error(f"检查定时任务启用状态失败: {str(e)}")
return False
async def start(self):
"""启动定时任务"""
@@ -120,7 +63,7 @@ class ScheduleManager:
while self.is_running:
try:
# 检查定时任务是否启用
if not self._is_schedule_enabled():
if not self.plugin.get_config("schedule.enable_schedule", False):
logger.info("定时任务已禁用,等待下次检查")
await asyncio.sleep(60)
continue
@@ -128,16 +71,17 @@ class ScheduleManager:
# 获取当前时间
current_time = datetime.datetime.now().strftime("%H:%M")
# 直接从配置文件读取定时任务配置
schedules = self._read_schedule_config()
# 从插件配置中获取定时任务
schedules = self.plugin.get_config("schedule.schedules", {})
if not schedules:
logger.info("未找到有效的定时任务配置")
await asyncio.sleep(60)
continue
# 检查每个定时任务
for schedule in schedules:
for time_str, topic in schedules.items():
schedule = {"time": time_str, "topic": topic}
await self._check_and_execute_schedule(schedule, current_time)
# 每分钟检查一次
@@ -314,49 +258,33 @@ class ScheduleManager:
"""获取定时任务状态"""
return {
"is_running": self.is_running,
"enabled": self._is_schedule_enabled(),
"schedules": self._read_schedule_config(),
"enabled": self.plugin.get_config("schedule.enable_schedule", False),
"schedules": self.plugin.get_config("schedule.schedules", {}),
"last_send_times": self.last_send_times
}
def add_schedule(self, time_str: str, topic: str) -> bool:
"""添加定时任务"""
try:
schedules = self.plugin.get_config("schedule.schedules", [])
new_schedule = {"time": time_str, "topic": topic}
# 检查是否已存在相同时间的任务
for schedule in schedules:
if isinstance(schedule, dict) and schedule.get("time") == time_str:
logger.warning(f"时间 {time_str} 已存在定时任务")
return False
schedules.append(new_schedule)
# 注意:这里需要插件系统支持动态更新配置
logger.info(f"添加定时任务: {time_str} - {topic}")
return True
except Exception as e:
logger.error(f"添加定时任务失败: {str(e)}")
schedules = self.plugin.get_config("schedule.schedules", {})
if time_str in schedules:
logger.warning(f"时间 {time_str} 已存在定时任务")
return False
schedules[time_str] = topic
# 注意:这里需要插件系统支持动态更新配置
logger.info(f"添加定时任务: {time_str} - {topic}")
return True
def remove_schedule(self, time_str: str) -> bool:
"""移除定时任务"""
try:
schedules = self.plugin.get_config("schedule.schedules", [])
original_count = len(schedules)
# 过滤掉指定时间的任务
schedules = [s for s in schedules if not (isinstance(s, dict) and s.get("time") == time_str)]
if len(schedules) < original_count:
# 注意:这里需要插件系统支持动态更新配置
logger.info(f"移除定时任务: {time_str}")
return True
else:
logger.warning(f"未找到时间为 {time_str} 的定时任务")
return False
except Exception as e:
logger.error(f"移除定时任务失败: {str(e)}")
schedules = self.plugin.get_config("schedule.schedules", {})
if time_str in schedules:
del schedules[time_str]
# 注意:这里需要插件系统支持动态更新配置
logger.info(f"移除定时任务: {time_str}")
return True
else:
logger.warning(f"未找到时间为 {time_str} 的定时任务")
return False

View File

@@ -339,7 +339,7 @@ class URLParserTool(BaseTool):
class WEBSEARCHPLUGIN(BasePlugin):
# 插件基本信息
plugin_name: str = "hello_world_plugin" # 内部标识符
plugin_name: str = "web_search_tool" # 内部标识符
enable_plugin: bool = True
dependencies: List[str] = [] # 插件依赖列表
python_dependencies: List[str] = ["asyncddgs","exa_py"] # Python包依赖列表