Delete src/plugins/built_in/Maizone directory

This commit is contained in:
雅诺狐
2025-08-13 16:49:57 +08:00
committed by GitHub
parent 87596219d3
commit 6a699ff649
7 changed files with 0 additions and 2864 deletions

View File

@@ -1,50 +0,0 @@
{
"manifest_version": 1,
"name": "MaiZone麦麦空间",
"version": "2.0.0",
"description": "让你的麦麦发QQ空间说说、评论、点赞支持AI配图、定时发送和自动监控功能",
"author": {
"name": "MaiBot-Plus",
"url": "https://github.com/MaiBot-Plus"
},
"license": "AGPL-v3.0",
"host_application": {
"min_version": "0.8.0",
"max_version": "0.10.0"
},
"homepage_url": "https://github.com/MaiBot-Plus/MaiMbot-Pro-Max",
"repository_url": "https://github.com/MaiBot-Plus/MaiMbot-Pro-Max",
"keywords": ["QQ空间", "说说", "动态", "评论", "点赞", "自动化", "AI配图"],
"categories": ["社交", "自动化", "QQ空间"],
"plugin_info": {
"is_built_in": false,
"plugin_type": "social",
"components": [
{
"type": "action",
"name": "send_feed",
"description": "根据指定主题发送一条QQ空间说说"
},
{
"type": "action",
"name": "read_feed",
"description": "读取指定好友最近的说说,并评论点赞"
},
{
"type": "command",
"name": "send_feed",
"description": "通过命令发送QQ空间说说"
}
],
"features": [
"智能生成说说内容",
"AI自动配图硅基流动",
"自动点赞评论好友说说",
"定时发送说说",
"权限管理系统",
"历史记录避重"
]
}
}

View File

@@ -1,415 +0,0 @@
"""
MaiZone插件独立配置文件加载系统
这个模块提供了一个独立的配置文件加载系统用于替代原本插件中的config加载系统。
它支持TOML格式的配置文件具有配置验证、默认值处理、类型转换等功能。
"""
import toml
import shutil
import datetime
from typing import Dict, Any, Union, List, Optional, Type
from dataclasses import dataclass, field
from pathlib import Path
from src.common.logger import get_logger
logger = get_logger("MaiZone.ConfigLoader")
@dataclass
class ConfigFieldSpec:
"""配置字段规格定义"""
name: str
type_hint: Type
default: Any
description: str = ""
required: bool = False
choices: Optional[List[Any]] = None
min_value: Optional[Union[int, float]] = None
max_value: Optional[Union[int, float]] = None
def validate_value(self, value: Any) -> tuple[bool, str]:
"""验证配置值是否符合规格"""
# 类型检查
if not isinstance(value, self.type_hint):
try:
# 尝试类型转换
if self.type_hint == bool and isinstance(value, str):
value = value.lower() in ('true', '1', 'yes', 'on')
else:
value = self.type_hint(value)
except (ValueError, TypeError):
return False, f"类型错误: 期望 {self.type_hint.__name__}, 得到 {type(value).__name__}"
# 选择项检查
if self.choices and value not in self.choices:
return False, f"值不在允许范围内: {self.choices}"
# 数值范围检查
if isinstance(value, (int, float)):
if self.min_value is not None and value < self.min_value:
return False, f"值小于最小值 {self.min_value}"
if self.max_value is not None and value > self.max_value:
return False, f"值大于最大值 {self.max_value}"
return True, ""
@dataclass
class ConfigSectionSpec:
"""配置节规格定义"""
name: str
description: str = ""
fields: Dict[str, ConfigFieldSpec] = field(default_factory=dict)
def add_field(self, field_spec: ConfigFieldSpec):
"""添加字段规格"""
self.fields[field_spec.name] = field_spec
def validate_section(self, section_data: Dict[str, Any]) -> tuple[bool, List[str]]:
"""验证配置节数据"""
errors = []
# 检查必需字段
for field_name, field_spec in self.fields.items():
if field_spec.required and field_name not in section_data:
errors.append(f"缺少必需字段: {field_name}")
# 验证每个字段
for field_name, value in section_data.items():
if field_name in self.fields:
field_spec = self.fields[field_name]
is_valid, error_msg = field_spec.validate_value(value)
if not is_valid:
errors.append(f"{field_name}: {error_msg}")
else:
logger.warning(f"未知配置字段: {self.name}.{field_name}")
return len(errors) == 0, errors
class MaiZoneConfigLoader:
"""MaiZone插件独立配置加载器"""
def __init__(self, plugin_dir: str, config_filename: str = "config.toml"):
"""
初始化配置加载器
Args:
plugin_dir: 插件目录路径
config_filename: 配置文件名
"""
self.plugin_dir = Path(plugin_dir)
self.config_filename = config_filename
self.config_file_path = self.plugin_dir / config_filename
self.config_data: Dict[str, Any] = {}
self.config_specs: Dict[str, ConfigSectionSpec] = {}
self.config_version = "2.1.0"
def load_config(self) -> bool:
"""
加载配置文件
Returns:
bool: 是否成功加载
"""
try:
# 如果配置文件不存在,生成默认配置
if not self.config_file_path.exists():
logger.info(f"配置文件不存在,生成默认配置: {self.config_file_path}")
self._generate_default_config()
# 加载配置文件
with open(self.config_file_path, 'r', encoding='utf-8') as f:
self.config_data = toml.load(f)
logger.info(f"成功加载配置文件: {self.config_file_path}")
# 验证配置
self._validate_config()
# 检查版本并迁移
self._check_and_migrate_config()
return True
except Exception as e:
logger.error(f"加载配置文件失败: {e}")
return False
def _generate_default_config(self):
"""生成默认配置文件"""
try:
# 确保插件目录存在
self.plugin_dir.mkdir(parents=True, exist_ok=True)
# 生成默认配置数据
default_config = {}
for section_name, section_spec in self.config_specs.items():
section_data = {}
for field_name, field_spec in section_spec.fields.items():
section_data[field_name] = field_spec.default
default_config[section_name] = section_data
# 保存到文件
self._save_config_to_file(default_config)
self.config_data = default_config
logger.info(f"默认配置文件已生成: {self.config_file_path}")
except Exception as e:
logger.error(f"生成默认配置文件失败: {e}")
def _save_config_to_file(self, config_data: Dict[str, Any]):
"""保存配置到文件(带注释)"""
toml_content = "# MaiZone插件配置文件\n"
toml_content += "# 让你的麦麦发QQ空间说说、评论、点赞支持AI配图、定时发送和自动监控功能\n"
toml_content += f"# 配置版本: {self.config_version}\n\n"
for section_name, section_spec in self.config_specs.items():
if section_name not in config_data:
continue
# 添加节描述
toml_content += f"# {section_spec.description}\n"
toml_content += f"[{section_name}]\n\n"
section_data = config_data[section_name]
for field_name, field_spec in section_spec.fields.items():
if field_name not in section_data:
continue
# 添加字段描述
toml_content += f"# {field_spec.description}\n"
if field_spec.choices:
toml_content += f"# 可选值: {', '.join(map(str, field_spec.choices))}\n"
if field_spec.min_value is not None or field_spec.max_value is not None:
range_str = "# 范围: "
if field_spec.min_value is not None:
range_str += f"最小值 {field_spec.min_value}"
if field_spec.max_value is not None:
if field_spec.min_value is not None:
range_str += f", 最大值 {field_spec.max_value}"
else:
range_str += f"最大值 {field_spec.max_value}"
toml_content += range_str + "\n"
# 添加字段值
value = section_data[field_name]
if isinstance(value, str):
toml_content += f'{field_name} = "{value}"\n'
elif isinstance(value, bool):
toml_content += f"{field_name} = {str(value).lower()}\n"
elif isinstance(value, list):
# 格式化列表
if all(isinstance(item, str) for item in value):
formatted_list = "[" + ", ".join(f'"{item}"' for item in value) + "]"
elif all(isinstance(item, dict) for item in value):
# 处理字典列表如schedules
# 使用 TOML 内联表格式
formatted_items = []
for item in value:
# TOML 内联表中的字符串需要转义
item_str = ", ".join([f'{k} = "{str(v)}"' for k, v in item.items()])
formatted_items.append(f"{{ {item_str} }}")
formatted_list = "[\n " + ",\n ".join(formatted_items) + "\n]"
else:
formatted_list = str(value)
toml_content += f"{field_name} = {formatted_list}\n"
else:
toml_content += f"{field_name} = {value}\n"
toml_content += "\n"
toml_content += "\n"
# 写入文件
with open(self.config_file_path, 'w', encoding='utf-8') as f:
f.write(toml_content)
def _validate_config(self) -> bool:
"""验证配置数据"""
all_valid = True
for section_name, section_spec in self.config_specs.items():
if section_name not in self.config_data:
logger.warning(f"配置文件缺少节: {section_name}")
continue
section_data = self.config_data[section_name]
is_valid, errors = section_spec.validate_section(section_data)
if not is_valid:
logger.error(f"配置节 {section_name} 验证失败:")
for error in errors:
logger.error(f" - {error}")
all_valid = False
return all_valid
def _check_and_migrate_config(self):
"""检查配置版本并进行迁移"""
current_version = self.get_config("plugin.config_version", "1.0.0")
if current_version != self.config_version:
logger.info(f"检测到配置版本变更: {current_version} -> {self.config_version}")
# 备份旧配置
self._backup_config()
# 迁移配置
self._migrate_config(current_version, self.config_version)
# 更新版本号
self.config_data["plugin"]["config_version"] = self.config_version
# 保存迁移后的配置
self._save_config_to_file(self.config_data)
logger.info(f"配置已迁移到版本 {self.config_version}")
def _backup_config(self):
"""备份当前配置文件"""
if self.config_file_path.exists():
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = self.config_file_path.with_suffix(f".backup_{timestamp}.toml")
shutil.copy2(self.config_file_path, backup_path)
logger.info(f"配置文件已备份到: {backup_path}")
def _migrate_config(self, from_version: str, to_version: str):
"""迁移配置数据"""
# 创建新的配置结构
new_config = {}
for section_name, section_spec in self.config_specs.items():
new_section = {}
# 复制现有配置值
if section_name in self.config_data:
old_section = self.config_data[section_name]
for field_name, field_spec in section_spec.fields.items():
if field_name in old_section:
new_section[field_name] = old_section[field_name]
else:
new_section[field_name] = field_spec.default
logger.info(f"添加新配置项: {section_name}.{field_name} = {field_spec.default}")
else:
# 新增节,使用默认值
for field_name, field_spec in section_spec.fields.items():
new_section[field_name] = field_spec.default
logger.info(f"添加新配置节: {section_name}")
new_config[section_name] = new_section
self.config_data = new_config
def get_config(self, key: str, default: Any = None) -> Any:
"""
获取配置值,支持嵌套键访问
Args:
key: 配置键名,支持嵌套访问如 "section.field"
default: 默认值
Returns:
Any: 配置值或默认值
"""
keys = key.split('.')
current = self.config_data
for k in keys:
if isinstance(current, dict) and k in current:
current = current[k]
else:
return default
return current
def set_config(self, key: str, value: Any) -> bool:
"""
设置配置值
Args:
key: 配置键名
value: 配置值
Returns:
bool: 是否设置成功
"""
try:
keys = key.split('.')
if len(keys) != 2:
logger.error(f"配置键格式错误: {key},应为 'section.field' 格式")
return False
section_name, field_name = keys
# 检查节是否存在
if section_name not in self.config_specs:
logger.error(f"未知配置节: {section_name}")
return False
# 检查字段是否存在
if field_name not in self.config_specs[section_name].fields:
logger.error(f"未知配置字段: {key}")
return False
# 验证值
field_spec = self.config_specs[section_name].fields[field_name]
is_valid, error_msg = field_spec.validate_value(value)
if not is_valid:
logger.error(f"配置值验证失败 {key}: {error_msg}")
return False
# 设置值
if section_name not in self.config_data:
self.config_data[section_name] = {}
self.config_data[section_name][field_name] = value
logger.debug(f"设置配置: {key} = {value}")
return True
except Exception as e:
logger.error(f"设置配置失败 {key}: {e}")
return False
def save_config(self) -> bool:
"""
保存当前配置到文件
Returns:
bool: 是否保存成功
"""
try:
self._save_config_to_file(self.config_data)
logger.info(f"配置已保存到: {self.config_file_path}")
return True
except Exception as e:
logger.error(f"保存配置失败: {e}")
return False
def reload_config(self) -> bool:
"""
重新加载配置文件
Returns:
bool: 是否重新加载成功
"""
return self.load_config()
def get_config_info(self) -> Dict[str, Any]:
"""
获取配置信息
Returns:
Dict[str, Any]: 配置信息
"""
return {
"config_file": str(self.config_file_path),
"config_version": self.config_version,
"sections": list(self.config_specs.keys()),
"loaded": bool(self.config_data)
}

View File

@@ -1,240 +0,0 @@
import asyncio
import random
import time
import traceback
from typing import Dict, Any
from src.common.logger import get_logger
from src.plugin_system.apis import llm_api, config_api
# 导入工具模块
import sys
import os
sys.path.append(os.path.dirname(__file__))
from qzone_utils import QZoneManager
# 获取日志记录器
logger = get_logger('MaiZone-Monitor')
class MonitorManager:
"""监控管理器 - 负责自动监控好友说说并点赞评论"""
def __init__(self, plugin):
"""初始化监控管理器"""
self.plugin = plugin
self.is_running = False
self.task = None
self.last_check_time = 0
logger.info("监控管理器初始化完成")
async def start(self):
"""启动监控任务"""
if self.is_running:
logger.warning("监控任务已在运行中")
return
self.is_running = True
self.task = asyncio.create_task(self._monitor_loop())
logger.info("说说监控任务已启动")
async def stop(self):
"""停止监控任务"""
if not self.is_running:
return
self.is_running = False
if self.task:
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
logger.info("监控任务已被取消")
logger.info("说说监控任务已停止")
async def _monitor_loop(self):
"""监控任务主循环"""
while self.is_running:
try:
# 获取监控间隔配置
interval_minutes = int(self.plugin.get_config("monitor.interval_minutes", 10) or 10)
# 等待指定时间间隔
await asyncio.sleep(interval_minutes * 60)
# 执行监控检查
await self._check_and_process_feeds()
except asyncio.CancelledError:
logger.info("监控循环被取消")
break
except Exception as e:
logger.error(f"监控任务出错: {str(e)}")
logger.error(traceback.format_exc())
# 出错后等待5分钟再重试
await asyncio.sleep(300)
async def _check_and_process_feeds(self):
"""检查并处理好友说说"""
try:
# 获取配置
qq_account = config_api.get_global_config("bot.qq_account", "")
read_num = 10 # 监控时读取较少的说说数量
logger.info("监控任务: 开始检查好友说说")
# 创建QZone管理器 (监控模式不需要stream_id)
qzone_manager = QZoneManager()
# 获取监控说说列表
feeds_list = await qzone_manager.monitor_read_feed(qq_account, read_num)
if not feeds_list:
logger.info("监控任务: 未发现新说说")
return
logger.info(f"监控任务: 发现 {len(feeds_list)} 条新说说")
# 处理每条说说
for feed in feeds_list:
try:
await self._process_monitor_feed(feed, qzone_manager)
# 每条说说之间随机延迟
await asyncio.sleep(3 + random.random() * 2)
except Exception as e:
logger.error(f"处理监控说说失败: {str(e)}")
except Exception as e:
logger.error(f"监控检查失败: {str(e)}")
async def _process_monitor_feed(self, feed: Dict[str, Any], qzone_manager: QZoneManager):
"""处理单条监控说说"""
try:
# 提取说说信息
target_qq = feed.get("target_qq", "")
tid = feed.get("tid", "")
content = feed.get("content", "")
images = feed.get("images", [])
rt_con = feed.get("rt_con", "")
# 构建完整内容用于显示
full_content = content
if images:
full_content += f" [图片: {len(images)}张]"
if rt_con:
full_content += f" [转发: {rt_con[:20]}...]"
logger.info(f"监控处理说说: {target_qq} - {full_content[:30]}...")
# 获取配置
qq_account = config_api.get_global_config("bot.qq_account", "")
like_possibility = float(self.plugin.get_config("read.like_possibility", 1.0) or 1.0)
comment_possibility = float(self.plugin.get_config("read.comment_possibility", 0.3) or 0.3)
# 随机决定是否评论
if random.random() <= comment_possibility:
comment = await self._generate_monitor_comment(content, rt_con, target_qq)
if comment:
success = await qzone_manager.comment_feed(qq_account, target_qq, tid, comment)
if success:
logger.info(f"监控评论成功: '{comment}'")
else:
logger.error(f"监控评论失败: {content[:20]}...")
# 随机决定是否点赞
if random.random() <= like_possibility:
success = await qzone_manager.like_feed(qq_account, target_qq, tid)
if success:
logger.info(f"监控点赞成功: {content[:20]}...")
else:
logger.error(f"监控点赞失败: {content[:20]}...")
except Exception as e:
logger.error(f"处理监控说说异常: {str(e)}")
async def _generate_monitor_comment(self, content: str, rt_con: str, target_qq: str) -> str:
"""生成监控评论内容"""
try:
# 获取模型配置
models = llm_api.get_available_models()
text_model = str(self.plugin.get_config("models.text_model", "replyer_1"))
model_config = models.get(text_model)
if not model_config:
logger.error("未配置LLM模型")
return ""
# 获取机器人信息
bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人")
bot_expression = config_api.get_global_config("expression.expression_style", "内容积极向上")
# 构建提示词
if not rt_con:
prompt = f"""
你是'{bot_personality}',你正在浏览你好友'{target_qq}'的QQ空间
你看到了你的好友'{target_qq}'qq空间上内容是'{content}'的说说,你想要发表你的一条评论,
{bot_expression},回复的平淡一些,简短一些,说中文,
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,不要输出多余内容(包括前后缀,冒号和引号,括号()表情包at或 @等 )。只输出回复内容
"""
else:
prompt = f"""
你是'{bot_personality}',你正在浏览你好友'{target_qq}'的QQ空间
你看到了你的好友'{target_qq}'在qq空间上转发了一条内容为'{rt_con}'的说说,你的好友的评论为'{content}'
你想要发表你的一条评论,{bot_expression},回复的平淡一些,简短一些,说中文,
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,不要输出多余内容(包括前后缀,冒号和引号,括号()表情包at或 @等 )。只输出回复内容
"""
logger.info(f"正在为 {target_qq} 的说说生成评论...")
# 生成评论
success, comment, reasoning, model_name = await llm_api.generate_with_model(
prompt=prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if success:
logger.info(f"成功生成监控评论: '{comment}'")
return comment
else:
logger.error("生成监控评论失败")
return ""
except Exception as e:
logger.error(f"生成监控评论异常: {str(e)}")
return ""
def get_status(self) -> Dict[str, Any]:
"""获取监控状态"""
return {
"is_running": self.is_running,
"interval_minutes": self.plugin.get_config("monitor.interval_minutes", 10),
"last_check_time": self.last_check_time,
"enabled": self.plugin.get_config("monitor.enable_auto_monitor", False)
}
async def manual_check(self) -> Dict[str, Any]:
"""手动执行一次监控检查"""
try:
logger.info("执行手动监控检查")
await self._check_and_process_feeds()
return {
"success": True,
"message": "手动监控检查完成",
"timestamp": time.time()
}
except Exception as e:
logger.error(f"手动监控检查失败: {str(e)}")
return {
"success": False,
"message": f"手动监控检查失败: {str(e)}",
"timestamp": time.time()
}

View File

@@ -1,824 +0,0 @@
import asyncio
import random
import time
from typing import List, Tuple, Type
from src.common.logger import get_logger
from src.plugin_system import (
BasePlugin, register_plugin, BaseAction, BaseCommand,
ComponentInfo, ActionActivationType, ChatMode
)
from src.plugin_system.apis import llm_api, config_api, person_api, generator_api
from src.plugin_system.base.config_types import ConfigField
# 导入插件工具模块
import sys
import os
sys.path.append(os.path.dirname(__file__))
from qzone_utils import (
QZoneManager, generate_image_by_sf, get_send_history
)
from scheduler import ScheduleManager
from config_loader import MaiZoneConfigLoader
# 获取日志记录器
logger = get_logger('MaiZone')
# ===== 发送说说命令组件 =====
class SendFeedCommand(BaseCommand):
"""发送说说命令 - 响应 /send_feed 命令"""
command_name = "send_feed"
command_description = "发送一条QQ空间说说"
command_pattern = r"^/send_feed(?:\s+(?P<topic>\w+))?$"
command_help = "发一条主题为<topic>或随机的说说"
command_examples = ["/send_feed", "/send_feed 日常"]
intercept_message = True
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 获取配置加载器引用
self.config_loader = None
self._init_config_loader()
def _init_config_loader(self):
"""初始化配置加载器"""
try:
plugin_dir = os.path.dirname(__file__)
self.config_loader = MaiZoneConfigLoader(plugin_dir)
self.config_loader.load_config()
except Exception as e:
logger.error(f"初始化配置加载器失败: {e}")
def get_config(self, key: str, default=None):
"""获取配置值"""
if self.config_loader:
return self.config_loader.get_config(key, default)
return default
def check_permission(self, qq_account: str) -> bool:
"""检查用户权限"""
permission_list = self.get_config("send.permission", [])
permission_type = self.get_config("send.permission_type", "whitelist")
logger.info(f'权限检查: {permission_type}:{permission_list}')
if not isinstance(permission_list, list):
logger.error("权限列表配置错误")
return False
if permission_type == 'whitelist':
return qq_account in permission_list
elif permission_type == 'blacklist':
return qq_account not in permission_list
else:
logger.error('权限类型配置错误,应为 whitelist 或 blacklist')
return False
async def execute(self) -> Tuple[bool, str, bool]:
"""执行发送说说命令"""
try:
# 获取用户信息
user_id = self.message.message_info.user_info.user_id if self.message and self.message.message_info and self.message.message_info.user_info else None
# 权限检查
if not user_id or not self.check_permission(user_id):
logger.info(f"用户 {user_id} 权限不足")
await self.send_text("权限不足,无法使用此命令")
return False, "权限不足", True
# 获取主题
topic = self.matched_groups.get("topic", "")
# 生成说说内容
story = await self._generate_story_content(topic)
if not story:
return False, "生成说说内容失败", True
# 处理图片
await self._handle_images(story)
# 发送说说
success = await self._send_feed(story)
if success:
if self.get_config("send.enable_reply", True):
await self.send_text(f"已发送说说:\n{story}")
return True, "发送成功", True
else:
return False, "发送说说失败", True
except Exception as e:
logger.error(f"发送说说命令执行失败: {str(e)}")
return False, "命令执行失败", True
async def _generate_story_content(self, topic: str) -> str:
"""生成说说内容"""
try:
# 获取模型配置
models = llm_api.get_available_models()
text_model = str(self.get_config("models.text_model", "replyer_1"))
model_config = models.get(text_model)
if not model_config:
logger.error("未配置LLM模型")
return ""
# 获取机器人信息
bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人")
bot_expression = config_api.get_global_config("personality.reply_style", "内容积极向上")
qq_account = config_api.get_global_config("bot.qq_account", "")
# 构建提示词
if topic:
prompt = f"""
你是'{bot_personality}',你想写一条主题是'{topic}'的说说发表在qq空间上
{bot_expression}
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字,
只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出
"""
else:
prompt = f"""
你是'{bot_personality}'你想写一条说说发表在qq空间上主题不限
{bot_expression}
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字,
只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出
"""
# 添加历史记录
prompt += "\n以下是你以前发过的说说,写新说说时注意不要在相隔不长的时间发送相同主题的说说"
history_block = await get_send_history(qq_account)
if history_block:
prompt += history_block
# 生成内容
success, story, reasoning, model_name = await llm_api.generate_with_model(
prompt=prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if success:
logger.info(f"成功生成说说内容:'{story}'")
return story
else:
logger.error("生成说说内容失败")
return ""
except Exception as e:
logger.error(f"生成说说内容异常: {str(e)}")
return ""
async def _handle_images(self, story: str):
"""处理说说配图"""
try:
enable_ai_image = bool(self.get_config("send.enable_ai_image", False))
apikey = str(self.get_config("models.siliconflow_apikey", ""))
image_dir = str(self.get_config("send.image_directory", "./plugins/Maizone/images"))
image_num_raw = self.get_config("send.ai_image_number", 1)
image_num = int(image_num_raw if image_num_raw is not None else 1)
if enable_ai_image and apikey:
await generate_image_by_sf(
api_key=apikey,
story=story,
image_dir=image_dir,
batch_size=image_num
)
elif enable_ai_image and not apikey:
logger.error('启用了AI配图但未填写API密钥')
except Exception as e:
logger.error(f"处理配图失败: {str(e)}")
async def _send_feed(self, story: str) -> bool:
"""发送说说到QQ空间"""
try:
# 获取配置
qq_account = config_api.get_global_config("bot.qq_account", "")
enable_image = bool(self.get_config("send.enable_image", False))
image_dir = str(self.get_config("send.image_directory", "./plugins/Maizone/images"))
# 获取聊天流ID
stream_id = self.message.chat_stream.stream_id if self.message and self.message.chat_stream else None
# 创建QZone管理器并发送
qzone_manager = QZoneManager(stream_id)
success = await qzone_manager.send_feed(story, image_dir, qq_account, enable_image)
return success
except Exception as e:
logger.error(f"发送说说失败: {str(e)}")
return False
# ===== 发送说说动作组件 =====
class SendFeedAction(BaseAction):
"""发送说说动作 - 当用户要求发说说时激活"""
action_name = "send_feed"
action_description = "发一条相应主题的说说"
activation_type = ActionActivationType.KEYWORD
mode_enable = ChatMode.ALL
activation_keywords = ["说说", "空间", "动态"]
keyword_case_sensitive = False
action_parameters = {
"topic": "要发送的说说主题",
"user_name": "要求你发说说的好友的qq名称",
}
action_require = [
"用户要求发说说时使用",
"当有人希望你更新qq空间时使用",
"当你认为适合发说说时使用",
]
associated_types = ["text"]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 获取配置加载器引用
self.config_loader = None
self._init_config_loader()
def _init_config_loader(self):
"""初始化配置加载器"""
try:
plugin_dir = os.path.dirname(__file__)
self.config_loader = MaiZoneConfigLoader(plugin_dir)
self.config_loader.load_config()
except Exception as e:
logger.error(f"初始化配置加载器失败: {e}")
def get_config(self, key: str, default=None):
"""获取配置值"""
if self.config_loader:
return self.config_loader.get_config(key, default)
return default
def check_permission(self, qq_account: str) -> bool:
"""检查用户权限"""
permission_list = self.get_config("send.permission", [])
permission_type = self.get_config("send.permission_type", "whitelist")
logger.info(f'权限检查: {permission_type}:{permission_list}')
if isinstance(permission_list, list):
if permission_type == 'whitelist':
return qq_account in permission_list
elif permission_type == 'blacklist':
return qq_account not in permission_list
logger.error('权限类型配置错误')
return False
async def execute(self) -> Tuple[bool, str]:
"""执行发送说说动作"""
try:
# 获取用户信息
user_name = self.action_data.get("user_name", "")
person_id = person_api.get_person_id_by_name(user_name)
user_id = await person_api.get_person_value(person_id, "user_id")
# 权限检查
if not self.check_permission(user_id):
logger.info(f"用户 {user_id} 权限不足")
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.chat_stream,
action_data={"extra_info_block": f'{user_name}无权命令你发送说说,请用符合你人格特点的方式拒绝请求'}
)
if success and reply_set:
for reply_type, reply_content in reply_set:
if reply_type == "text":
await self.send_text(reply_content)
return False, "权限不足"
# 获取主题并生成内容
topic = self.action_data.get("topic", "")
story = await self._generate_story_content(topic)
if not story:
return False, "生成说说内容失败"
# 处理图片
await self._handle_images(story)
# 发送说说
success = await self._send_feed(story)
if success:
logger.info(f"成功发送说说: {story}")
# 生成回复
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.chat_stream,
action_data={"extra_info_block": f'你刚刚发了一条说说,内容为{story}'}
)
if success and reply_set:
for reply_type, reply_content in reply_set:
if reply_type == "text":
await self.send_text(reply_content)
return True, '发送成功'
else:
await self.send_text('我发了一条说说啦~')
return True, '发送成功但回复生成失败'
else:
return False, "发送说说失败"
except Exception as e:
logger.error(f"发送说说动作执行失败: {str(e)}")
return False, "动作执行失败"
async def _generate_story_content(self, topic: str) -> str:
"""生成说说内容"""
try:
# 获取模型配置
models = llm_api.get_available_models()
text_model = str(self.get_config("models.text_model", "replyer_1"))
model_config = models.get(text_model)
if not model_config:
return ""
# 获取机器人信息
bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人")
bot_expression = config_api.get_global_config("expression.expression_style", "内容积极向上")
qq_account = config_api.get_global_config("bot.qq_account", "")
# 构建提示词
prompt = f"""
你是{bot_personality},你想写一条主题是{topic}的说说发表在qq空间上
{bot_expression}
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字,
只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出
"""
# 添加历史记录
prompt += "\n以下是你以前发过的说说,写新说说时注意不要在相隔不长的时间发送相同主题的说说"
history_block = await get_send_history(qq_account)
if history_block:
prompt += history_block
# 生成内容
success, story, reasoning, model_name = await llm_api.generate_with_model(
prompt=prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if success:
return story
else:
return ""
except Exception as e:
logger.error(f"生成说说内容异常: {str(e)}")
return ""
async def _handle_images(self, story: str):
"""处理说说配图"""
try:
enable_ai_image = bool(self.get_config("send.enable_ai_image", False))
apikey = str(self.get_config("models.siliconflow_apikey", ""))
image_dir = str(self.get_config("send.image_directory", "./plugins/Maizone/images"))
image_num_raw = self.get_config("send.ai_image_number", 1)
image_num = int(image_num_raw if image_num_raw is not None else 1)
if enable_ai_image and apikey:
await generate_image_by_sf(
api_key=apikey,
story=story,
image_dir=image_dir,
batch_size=image_num
)
elif enable_ai_image and not apikey:
logger.error('启用了AI配图但未填写API密钥')
except Exception as e:
logger.error(f"处理配图失败: {str(e)}")
async def _send_feed(self, story: str) -> bool:
"""发送说说到QQ空间"""
try:
# 获取配置
qq_account = config_api.get_global_config("bot.qq_account", "")
enable_image = bool(self.get_config("send.enable_image", False))
image_dir = str(self.get_config("send.image_directory", "./plugins/Maizone/images"))
# 获取聊天流ID
stream_id = self.chat_stream.stream_id if self.chat_stream else None
# 创建QZone管理器并发送
qzone_manager = QZoneManager(stream_id)
success = await qzone_manager.send_feed(story, image_dir, qq_account, enable_image)
return success
except Exception as e:
logger.error(f"发送说说失败: {str(e)}")
return False
# ===== 阅读说说动作组件 =====
class ReadFeedAction(BaseAction):
"""阅读说说动作 - 当用户要求读说说时激活"""
action_name = "read_feed"
action_description = "读取好友最近的动态/说说/qq空间并评论点赞"
activation_type = ActionActivationType.KEYWORD
mode_enable = ChatMode.ALL
activation_keywords = ["说说", "空间", "动态"]
keyword_case_sensitive = False
action_parameters = {
"target_name": "需要阅读动态的好友的qq名称",
"user_name": "要求你阅读动态的好友的qq名称"
}
action_require = [
"需要阅读某人动态、说说、QQ空间时使用",
"当有人希望你评价某人的动态、说说、QQ空间",
"当你认为适合阅读说说、动态、QQ空间时使用",
]
associated_types = ["text"]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 获取配置加载器引用
self.config_loader = None
self._init_config_loader()
def _init_config_loader(self):
"""初始化配置加载器"""
try:
plugin_dir = os.path.dirname(__file__)
self.config_loader = MaiZoneConfigLoader(plugin_dir)
self.config_loader.load_config()
except Exception as e:
logger.error(f"初始化配置加载器失败: {e}")
def get_config(self, key: str, default=None):
"""获取配置值"""
if self.config_loader:
return self.config_loader.get_config(key, default)
return default
def check_permission(self, qq_account: str) -> bool:
"""检查用户权限"""
permission_list = self.get_config("read.permission", [])
permission_type = self.get_config("read.permission_type", "blacklist")
if not isinstance(permission_list, list):
return False
logger.info(f'权限检查: {permission_type}:{permission_list}')
if permission_type == 'whitelist':
return qq_account in permission_list
elif permission_type == 'blacklist':
return qq_account not in permission_list
else:
logger.error('权限类型配置错误')
return False
async def execute(self) -> Tuple[bool, str]:
"""执行阅读说说动作"""
try:
# 获取用户信息
user_name = self.action_data.get("user_name", "")
person_id = person_api.get_person_id_by_name(user_name)
user_id = await person_api.get_person_value(person_id, "user_id")
# 权限检查
if not self.check_permission(user_id):
logger.info(f"用户 {user_id} 权限不足")
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.chat_stream,
action_data={"extra_info_block": f'{user_name}无权命令你阅读说说,请用符合人格的方式进行拒绝的回复'}
)
if success and reply_set:
for reply_type, reply_content in reply_set:
if reply_type == "text":
await self.send_text(reply_content)
return False, "权限不足"
# 获取目标用户
target_name = self.action_data.get("target_name", "")
target_person_id = person_api.get_person_id_by_name(target_name)
target_qq = await person_api.get_person_value(target_person_id, "user_id")
# 读取并处理说说
success = await self._read_and_process_feeds(target_qq, target_name)
if success:
# 生成回复
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.chat_stream,
action_data={"extra_info_block": f'你刚刚成功读了{target_name}的说说,请告知你已经读了说说'}
)
if success and reply_set:
for reply_type, reply_content in reply_set:
if reply_type == "text":
await self.send_text(reply_content)
return True, '阅读成功'
return True, '阅读成功但回复生成失败'
else:
return False, "阅读说说失败"
except Exception as e:
logger.error(f"阅读说说动作执行失败: {str(e)}")
return False, "动作执行失败"
async def _read_and_process_feeds(self, target_qq: str, target_name: str) -> bool:
"""读取并处理说说"""
try:
# 获取配置
qq_account = config_api.get_global_config("bot.qq_account", "")
num_raw = self.get_config("read.read_number", 5)
num = int(num_raw if num_raw is not None else 5)
like_raw = self.get_config("read.like_possibility", 1.0)
like_possibility = float(like_raw if like_raw is not None else 1.0)
comment_raw = self.get_config("read.comment_possibility", 1.0)
comment_possibility = float(comment_raw if comment_raw is not None else 1.0)
# 获取聊天流ID
stream_id = self.chat_stream.stream_id if self.chat_stream else None
# 创建QZone管理器并读取说说
qzone_manager = QZoneManager(stream_id)
feeds_list = await qzone_manager.read_feed(qq_account, target_qq, num)
# 处理错误情况
if isinstance(feeds_list, list) and len(feeds_list) > 0 and isinstance(feeds_list[0], dict) and 'error' in feeds_list[0]:
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.chat_stream,
action_data={"extra_info_block": f'你在读取说说的时候出现了错误,错误原因:{feeds_list[0].get("error")}'}
)
if success and reply_set:
for reply_type, reply_content in reply_set:
if reply_type == "text":
await self.send_text(reply_content)
return True
# 处理说说列表
if isinstance(feeds_list, list):
logger.info(f"成功读取到{len(feeds_list)}条说说")
for feed in feeds_list:
# 随机延迟
time.sleep(3 + random.random())
# 处理说说内容
await self._process_single_feed(
feed, target_qq, target_name,
like_possibility, comment_possibility, qzone_manager
)
return True
else:
return False
except Exception as e:
logger.error(f"读取并处理说说失败: {str(e)}")
return False
async def _process_single_feed(self, feed: dict, target_qq: str, target_name: str,
like_possibility: float, comment_possibility: float,
qzone_manager):
"""处理单条说说"""
try:
content = feed.get("content", "")
images = feed.get("images", [])
if images:
for image in images:
content = content + str(image)
fid = feed.get("tid", "")
rt_con = feed.get("rt_con", "")
# 随机评论
if random.random() <= comment_possibility:
comment = await self._generate_comment(content, rt_con, target_name)
if comment:
success = await qzone_manager.comment_feed(
config_api.get_global_config("bot.qq_account", ""),
target_qq, fid, comment
)
if success:
logger.info(f"发送评论'{comment}'成功")
else:
logger.error(f"评论说说'{content[:20]}...'失败")
# 随机点赞
if random.random() <= like_possibility:
success = await qzone_manager.like_feed(
config_api.get_global_config("bot.qq_account", ""),
target_qq, fid
)
if success:
logger.info(f"点赞说说'{content[:10]}..'成功")
else:
logger.error(f"点赞说说'{content[:20]}...'失败")
except Exception as e:
logger.error(f"处理单条说说失败: {str(e)}")
async def _generate_comment(self, content: str, rt_con: str, target_name: str) -> str:
"""生成评论内容"""
try:
# 获取模型配置
models = llm_api.get_available_models()
text_model = str(self.get_config("models.text_model", "replyer_1"))
model_config = models.get(text_model)
if not model_config:
return ""
# 获取机器人信息
bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人")
bot_expression = config_api.get_global_config("expression.expression_style", "内容积极向上")
# 构建提示词
if not rt_con:
prompt = f"""
你是'{bot_personality}',你正在浏览你好友'{target_name}'的QQ空间
你看到了你的好友'{target_name}'qq空间上内容是'{content}'的说说,你想要发表你的一条评论,
{bot_expression},回复的平淡一些,简短一些,说中文,
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,不要输出多余内容(包括前后缀,冒号和引号,括号()表情包at或 @等 )。只输出回复内容
"""
else:
prompt = f"""
你是'{bot_personality}',你正在浏览你好友'{target_name}'的QQ空间
你看到了你的好友'{target_name}'在qq空间上转发了一条内容为'{rt_con}'的说说,你的好友的评论为'{content}'
你想要发表你的一条评论,{bot_expression},回复的平淡一些,简短一些,说中文,
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,不要输出多余内容(包括前后缀,冒号和引号,括号()表情包at或 @等 )。只输出回复内容
"""
logger.info(f"正在评论'{target_name}'的说说:{content[:20]}...")
# 生成评论
success, comment, reasoning, model_name = await llm_api.generate_with_model(
prompt=prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if success:
logger.info(f"成功生成评论内容:'{comment}'")
return comment
else:
logger.error("生成评论内容失败")
return ""
except Exception as e:
logger.error(f"生成评论内容异常: {str(e)}")
return ""
# ===== 插件主类 =====
@register_plugin
class MaiZonePlugin(BasePlugin):
"""MaiZone插件 - 让麦麦发QQ空间"""
# 插件基本信息
plugin_name: str = "MaiZonePlugin"
enable_plugin: bool = True
dependencies: List[str] = []
python_dependencies: List[str] = []
config_file_name: str = "config.toml"
# 配置节描述
config_section_descriptions = {
"plugin": "插件基础配置",
"models": "模型相关配置",
"send": "发送说说配置",
"read": "阅读说说配置",
"monitor": "自动监控配置",
"schedule": "定时发送配置",
}
# 配置模式定义
config_schema: dict = {
"plugin": {
"enable": ConfigField(type=bool, default=True, description="是否启用插件"),
"config_version": ConfigField(type=str, default="2.1.0", description="配置文件版本"),
},
"models": {
"text_model": ConfigField(type=str, default="replyer_1", description="生成文本的模型名称"),
"siliconflow_apikey": ConfigField(type=str, default="", description="硅基流动AI生图API密钥"),
},
"send": {
"permission": ConfigField(type=list, default=['1145141919810'], description="发送权限QQ号列表"),
"permission_type": ConfigField(type=str, default='whitelist', description="权限类型whitelist(白名单) 或 blacklist(黑名单)"),
"enable_image": ConfigField(type=bool, default=False, description="是否启用说说配图"),
"enable_ai_image": ConfigField(type=bool, default=False, description="是否启用AI生成配图"),
"enable_reply": ConfigField(type=bool, default=True, description="生成完成时是否发出回复"),
"ai_image_number": ConfigField(type=int, default=1, description="AI生成图片数量(1-4张)"),
"image_directory": ConfigField(type=str, default="./plugins/built_in/Maizone/images", description="图片存储目录")
},
"read": {
"permission": ConfigField(type=list, default=[], description="阅读权限QQ号列表"),
"permission_type": ConfigField(type=str, default='blacklist', description="权限类型whitelist(白名单) 或 blacklist(黑名单)"),
"read_number": ConfigField(type=int, default=5, description="一次读取的说说数量"),
"like_possibility": ConfigField(type=float, default=1.0, description="点赞概率(0.0-1.0)"),
"comment_possibility": ConfigField(type=float, default=0.3, description="评论概率(0.0-1.0)"),
},
"monitor": {
"enable_auto_monitor": ConfigField(type=bool, default=False, description="是否启用自动监控好友说说"),
"interval_minutes": ConfigField(type=int, default=10, description="监控间隔时间(分钟)"),
},
"schedule": {
"enable_schedule": ConfigField(type=bool, default=False, description="是否启用定时发送说说"),
"schedules": ConfigField(
type=str,
default=r"""{"08:00" = "早安","22:00" = "晚安"}""",
description="定时发送任务列表, 格式为 {\"时间\"= \"主题\"}"
),
},
}
def __init__(self, *args, **kwargs):
"""初始化插件"""
super().__init__(*args, **kwargs)
# 设置插件信息
self.plugin_name = "MaiZone"
self.plugin_description = "让麦麦实现QQ空间点赞、评论、发说说功能"
self.plugin_version = "2.0.0"
self.plugin_author = "重构版"
self.config_file_name = "config.toml"
# 初始化独立配置加载器
plugin_dir = self.plugin_dir
if plugin_dir is None:
plugin_dir = os.path.dirname(__file__)
self.config_loader = MaiZoneConfigLoader(plugin_dir, self.config_file_name)
# 加载配置
if not self.config_loader.load_config():
logger.error("配置加载失败,使用默认设置")
# 获取启用状态
self.enable_plugin = self.config_loader.get_config("plugin.enable", True)
# 初始化管理器
self.monitor_manager = None
self.schedule_manager = None
# 根据配置启动功能
if self.enable_plugin:
self._init_managers()
def _init_managers(self):
"""初始化管理器"""
try:
# 初始化监控管理器
if self.config_loader.get_config("monitor.enable_auto_monitor", False):
from .monitor import MonitorManager
self.monitor_manager = MonitorManager(self)
asyncio.create_task(self._start_monitor_delayed())
# 初始化定时管理器
if self.config_loader.get_config("schedule.enable_schedule", False):
logger.info("定时任务启用状态: true")
self.schedule_manager = ScheduleManager(self)
asyncio.create_task(self._start_scheduler_delayed())
except Exception as e:
logger.error(f"初始化管理器失败: {str(e)}")
async def _start_monitor_delayed(self):
"""延迟启动监控管理器"""
try:
await asyncio.sleep(10) # 等待插件完全初始化
if self.monitor_manager:
await self.monitor_manager.start()
except Exception as e:
logger.error(f"启动监控管理器失败: {str(e)}")
async def _start_scheduler_delayed(self):
"""延迟启动定时管理器"""
try:
await asyncio.sleep(10) # 等待插件完全初始化
if self.schedule_manager:
await self.schedule_manager.start()
except Exception as e:
logger.error(f"启动定时管理器失败: {str(e)}")
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
"""获取插件组件列表"""
return [
(SendFeedAction.get_action_info(), SendFeedAction),
(ReadFeedAction.get_action_info(), ReadFeedAction),
(SendFeedCommand.get_command_info(), SendFeedCommand)
]

File diff suppressed because it is too large Load Diff

View File

@@ -1,286 +0,0 @@
import asyncio
import datetime
import time
import traceback
import os
from typing import Dict, Any
from src.common.logger import get_logger
from src.plugin_system.apis import llm_api, config_api
# 导入工具模块
import sys
sys.path.append(os.path.dirname(__file__))
from qzone_utils import QZoneManager, get_send_history
# 获取日志记录器
logger = get_logger('MaiZone-Scheduler')
class ScheduleManager:
"""定时任务管理器 - 负责定时发送说说"""
def __init__(self, plugin):
"""初始化定时任务管理器"""
self.plugin = plugin
self.is_running = False
self.task = None
self.last_send_times: Dict[str, float] = {} # 记录每个时间点的最后发送时间
logger.info("定时任务管理器初始化完成")
async def start(self):
"""启动定时任务"""
if self.is_running:
logger.warning("定时任务已在运行中")
return
self.is_running = True
self.task = asyncio.create_task(self._schedule_loop())
logger.info("定时发送说说任务已启动")
async def stop(self):
"""停止定时任务"""
if not self.is_running:
return
self.is_running = False
if self.task:
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
logger.info("定时任务已被取消")
logger.info("定时发送说说任务已停止")
async def _schedule_loop(self):
"""定时任务主循环"""
while self.is_running:
try:
# 检查定时任务是否启用
if not self.plugin.get_config("schedule.enable_schedule", False):
logger.info("定时任务已禁用,等待下次检查")
await asyncio.sleep(60)
continue
# 获取当前时间
current_time = datetime.datetime.now().strftime("%H:%M")
# 从插件配置中获取定时任务
schedules = self.plugin.get_config("schedule.schedules", {})
if not schedules:
logger.info("未找到有效的定时任务配置")
await asyncio.sleep(60)
continue
# 检查每个定时任务
for time_str, topic in schedules.items():
schedule = {"time": time_str, "topic": topic}
await self._check_and_execute_schedule(schedule, current_time)
# 每分钟检查一次
await asyncio.sleep(60)
except asyncio.CancelledError:
logger.info("定时任务循环被取消")
break
except Exception as e:
logger.error(f"定时任务循环出错: {str(e)}")
logger.error(traceback.format_exc())
# 出错后等待5分钟再重试
await asyncio.sleep(300)
async def _check_and_execute_schedule(self, schedule: Dict[str, Any], current_time: str):
"""检查并执行定时任务"""
try:
schedule_time = schedule.get("time", "")
topic = schedule.get("topic", "")
# 检查是否到达发送时间
if current_time == schedule_time:
# 避免同一分钟内重复发送
last_send_time = self.last_send_times.get(schedule_time, 0)
current_timestamp = time.time()
if current_timestamp - last_send_time > 60: # 超过1分钟才允许发送
logger.info(f"定时任务触发: {schedule_time} - 主题: {topic}")
self.last_send_times[schedule_time] = current_timestamp
# 执行发送任务
success = await self._execute_scheduled_send(topic)
if success:
logger.info(f"定时说说发送成功: {topic}")
else:
logger.error(f"定时说说发送失败: {topic}")
else:
logger.debug(f"跳过重复发送: {schedule_time}")
except Exception as e:
logger.error(f"检查定时任务失败: {str(e)}")
async def _execute_scheduled_send(self, topic: str) -> bool:
"""执行定时发送任务"""
try:
# 生成说说内容
story = await self._generate_story_content(topic)
if not story:
logger.error("生成定时说说内容失败")
return False
logger.info(f"定时任务生成说说内容: '{story}'")
# 处理配图
await self._handle_images(story)
# 发送说说
success = await self._send_scheduled_feed(story)
return success
except Exception as e:
logger.error(f"执行定时发送任务失败: {str(e)}")
return False
async def _generate_story_content(self, topic: str) -> str:
"""生成定时说说内容"""
try:
# 获取模型配置
models = llm_api.get_available_models()
text_model = str(self.plugin.get_config("models.text_model", "replyer_1"))
model_config = models.get(text_model)
if not model_config:
logger.error("未配置LLM模型")
return ""
# 获取机器人信息
bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人")
bot_expression = config_api.get_global_config("expression.expression_style", "内容积极向上")
qq_account = config_api.get_global_config("bot.qq_account", "")
# 构建提示词
if topic:
prompt = f"""
你是'{bot_personality}',你想写一条主题是'{topic}'的说说发表在qq空间上
{bot_expression}
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字,
只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出
"""
else:
prompt = f"""
你是'{bot_personality}'你想写一条说说发表在qq空间上主题不限
{bot_expression}
不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字,
只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出
"""
# 添加历史记录避免重复
prompt += "\n以下是你最近发过的说说,写新说说时注意不要在相隔不长的时间发送相似内容的说说\n"
history_block = await get_send_history(qq_account)
if history_block:
prompt += history_block
# 生成内容
success, story, reasoning, model_name = await llm_api.generate_with_model(
prompt=prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if success:
return story
else:
logger.error("生成定时说说内容失败")
return ""
except Exception as e:
logger.error(f"生成定时说说内容异常: {str(e)}")
return ""
async def _handle_images(self, story: str):
"""处理定时说说配图"""
try:
enable_ai_image = bool(self.plugin.get_config("send.enable_ai_image", False))
apikey = str(self.plugin.get_config("models.siliconflow_apikey", ""))
image_dir = str(self.plugin.get_config("send.image_directory", "./plugins/Maizone/images"))
image_num = int(self.plugin.get_config("send.ai_image_number", 1) or 1)
if enable_ai_image and apikey:
from qzone_utils import generate_image_by_sf
await generate_image_by_sf(
api_key=apikey,
story=story,
image_dir=image_dir,
batch_size=image_num
)
logger.info("定时任务AI配图生成完成")
elif enable_ai_image and not apikey:
logger.warning('启用了AI配图但未填写API密钥')
except Exception as e:
logger.error(f"处理定时说说配图失败: {str(e)}")
async def _send_scheduled_feed(self, story: str) -> bool:
"""发送定时说说"""
try:
# 获取配置
qq_account = config_api.get_global_config("bot.qq_account", "")
enable_image = self.plugin.get_config("send.enable_image", False)
image_dir = str(self.plugin.get_config("send.image_directory", "./plugins/Maizone/images"))
# 创建QZone管理器并发送 (定时任务不需要stream_id)
qzone_manager = QZoneManager()
success = await qzone_manager.send_feed(story, image_dir, qq_account, enable_image)
if success:
logger.info(f"定时说说发送成功: {story}")
else:
logger.error("定时说说发送失败")
return success
except Exception as e:
logger.error(f"发送定时说说失败: {str(e)}")
return False
def get_status(self) -> Dict[str, Any]:
"""获取定时任务状态"""
return {
"is_running": self.is_running,
"enabled": self.plugin.get_config("schedule.enable_schedule", False),
"schedules": self.plugin.get_config("schedule.schedules", {}),
"last_send_times": self.last_send_times
}
def add_schedule(self, time_str: str, topic: str) -> bool:
"""添加定时任务"""
schedules = self.plugin.get_config("schedule.schedules", {})
if time_str in schedules:
logger.warning(f"时间 {time_str} 已存在定时任务")
return False
schedules[time_str] = topic
# 注意:这里需要插件系统支持动态更新配置
logger.info(f"添加定时任务: {time_str} - {topic}")
return True
def remove_schedule(self, time_str: str) -> bool:
"""移除定时任务"""
schedules = self.plugin.get_config("schedule.schedules", {})
if time_str in schedules:
del schedules[time_str]
# 注意:这里需要插件系统支持动态更新配置
logger.info(f"移除定时任务: {time_str}")
return True
else:
logger.warning(f"未找到时间为 {time_str} 的定时任务")
return False