diff --git a/.gitignore b/.gitignore index 61ce5df22..7a346860e 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ log_debug/ run_amds.bat run_none.bat run.py +cookies-*.json message_queue_content.txt message_queue_content.bat message_queue_window.bat diff --git a/requirements.txt b/requirements.txt index c16e924bc..ed575c2f8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -49,3 +49,12 @@ scikit-learn seaborn structlog watchdog +httpx>=0.24.0 +requests>=2.28.0 +beautifulsoup4>=4.11.0 +lxml>=4.9.0 +json5>=0.9.0 +toml>=0.10.0 +beautifulsoup4 +exa_py +asyncddgs diff --git a/src/chat/emoji_system/emoji_manager.py b/src/chat/emoji_system/emoji_manager.py index bb0171d76..c04c456f8 100644 --- a/src/chat/emoji_system/emoji_manager.py +++ b/src/chat/emoji_system/emoji_manager.py @@ -655,7 +655,7 @@ class EmojiManager: self._ensure_db() logger.debug("[数据库] 开始加载所有表情包记录 ...") - emoji_instances = session.execute(stmt = select(Emoji)).scalars().all() + emoji_instances = session.execute(select(Emoji)).scalars().all() emoji_objects, load_errors = _to_emoji_objects(emoji_instances) # 更新内存中的列表和数量 @@ -684,12 +684,12 @@ class EmojiManager: self._ensure_db() if emoji_hash: - session.execute(select(Emoji).where(Emoji.emoji_hash == emoji_hash)).scalars().all() + query = session.execute(select(Emoji).where(Emoji.emoji_hash == emoji_hash)).scalars().all() else: logger.warning( "[查询] 未提供 hash,将尝试加载所有表情包,建议使用 get_all_emoji_from_db 更新管理器状态。" ) - query = session.execute(select(Emoji)).scalars().all() + query = session.execute(select(Emoji)).scalars().all() emoji_instances = query emoji_objects, load_errors = _to_emoji_objects(emoji_instances) diff --git a/src/plugins/built_in/Maizone/__init__.py b/src/plugins/built_in/Maizone/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/plugins/built_in/Maizone/_manifest.json b/src/plugins/built_in/Maizone/_manifest.json new file mode 100644 index 000000000..8f353a6e2 --- /dev/null +++ b/src/plugins/built_in/Maizone/_manifest.json @@ -0,0 +1,50 @@ +{ + "manifest_version": 1, + "name": "MaiZone(麦麦空间)", + "version": "2.0.0", + "description": "让你的麦麦发QQ空间说说、评论、点赞,支持AI配图、定时发送和自动监控功能", + "author": { + "name": "重构版", + "url": "https://github.com/internetsb" + }, + "license": "AGPL-v3.0", + + "host_application": { + "min_version": "0.8.0", + "max_version": "0.10.0" + }, + "homepage_url": "https://github.com/internetsb/Maizone", + "repository_url": "https://github.com/internetsb/Maizone", + "keywords": ["QQ空间", "说说", "动态", "评论", "点赞", "自动化", "AI配图"], + "categories": ["社交", "自动化", "QQ空间"], + + "plugin_info": { + "is_built_in": false, + "plugin_type": "social", + "components": [ + { + "type": "action", + "name": "send_feed", + "description": "根据指定主题发送一条QQ空间说说" + }, + { + "type": "action", + "name": "read_feed", + "description": "读取指定好友最近的说说,并评论点赞" + }, + { + "type": "command", + "name": "send_feed", + "description": "通过命令发送QQ空间说说" + } + ], + "features": [ + "智能生成说说内容", + "AI自动配图(硅基流动)", + "自动点赞评论好友说说", + "定时发送说说", + "权限管理系统", + "历史记录避重" + ] + } +} \ No newline at end of file diff --git a/src/plugins/built_in/Maizone/config_loader.py b/src/plugins/built_in/Maizone/config_loader.py new file mode 100644 index 000000000..b7e2f1c99 --- /dev/null +++ b/src/plugins/built_in/Maizone/config_loader.py @@ -0,0 +1,474 @@ +""" +MaiZone插件独立配置文件加载系统 + +这个模块提供了一个独立的配置文件加载系统,用于替代原本插件中的config加载系统。 +它支持TOML格式的配置文件,具有配置验证、默认值处理、类型转换等功能。 +""" + +import os +import toml +import shutil +import datetime +from typing import Dict, Any, Union, List, Optional, Type +from dataclasses import dataclass, field +from pathlib import Path + +from src.common.logger import get_logger + +logger = get_logger("MaiZone.ConfigLoader") + + +@dataclass +class ConfigFieldSpec: + """配置字段规格定义""" + name: str + type_hint: Type + default: Any + description: str = "" + required: bool = False + choices: Optional[List[Any]] = None + min_value: Optional[Union[int, float]] = None + max_value: Optional[Union[int, float]] = None + + def validate_value(self, value: Any) -> tuple[bool, str]: + """验证配置值是否符合规格""" + # 类型检查 + if not isinstance(value, self.type_hint): + try: + # 尝试类型转换 + if self.type_hint == bool and isinstance(value, str): + value = value.lower() in ('true', '1', 'yes', 'on') + else: + value = self.type_hint(value) + except (ValueError, TypeError): + return False, f"类型错误: 期望 {self.type_hint.__name__}, 得到 {type(value).__name__}" + + # 选择项检查 + if self.choices and value not in self.choices: + return False, f"值不在允许范围内: {self.choices}" + + # 数值范围检查 + if isinstance(value, (int, float)): + if self.min_value is not None and value < self.min_value: + return False, f"值小于最小值 {self.min_value}" + if self.max_value is not None and value > self.max_value: + return False, f"值大于最大值 {self.max_value}" + + return True, "" + + +@dataclass +class ConfigSectionSpec: + """配置节规格定义""" + name: str + description: str = "" + fields: Dict[str, ConfigFieldSpec] = field(default_factory=dict) + + def add_field(self, field_spec: ConfigFieldSpec): + """添加字段规格""" + self.fields[field_spec.name] = field_spec + + def validate_section(self, section_data: Dict[str, Any]) -> tuple[bool, List[str]]: + """验证配置节数据""" + errors = [] + + # 检查必需字段 + for field_name, field_spec in self.fields.items(): + if field_spec.required and field_name not in section_data: + errors.append(f"缺少必需字段: {field_name}") + + # 验证每个字段 + for field_name, value in section_data.items(): + if field_name in self.fields: + field_spec = self.fields[field_name] + is_valid, error_msg = field_spec.validate_value(value) + if not is_valid: + errors.append(f"{field_name}: {error_msg}") + else: + logger.warning(f"未知配置字段: {self.name}.{field_name}") + + return len(errors) == 0, errors + + +class MaiZoneConfigLoader: + """MaiZone插件独立配置加载器""" + + def __init__(self, plugin_dir: str, config_filename: str = "config.toml"): + """ + 初始化配置加载器 + + Args: + plugin_dir: 插件目录路径 + config_filename: 配置文件名 + """ + self.plugin_dir = Path(plugin_dir) + self.config_filename = config_filename + self.config_file_path = self.plugin_dir / config_filename + self.config_data: Dict[str, Any] = {} + self.config_specs: Dict[str, ConfigSectionSpec] = {} + self.config_version = "2.0.0" + + # 初始化配置规格 + self._init_config_specs() + + def _init_config_specs(self): + """初始化配置规格定义""" + # 插件基础配置节 + plugin_section = ConfigSectionSpec("plugin", "插件基础配置") + plugin_section.add_field(ConfigFieldSpec("enable", bool, True, "是否启用插件")) + plugin_section.add_field(ConfigFieldSpec("config_version", str, "2.0.0", "配置文件版本")) + plugin_section.add_field(ConfigFieldSpec("http_port", str, "3000", "NapCat HTTP服务器端口号")) + plugin_section.add_field(ConfigFieldSpec("http_host", str, "127.0.0.1", "NapCat HTTP服务器地址")) + self.config_specs["plugin"] = plugin_section + + # 模型相关配置节 + models_section = ConfigSectionSpec("models", "模型相关配置") + models_section.add_field(ConfigFieldSpec("text_model", str, "replyer_1", "生成文本的模型名称")) + models_section.add_field(ConfigFieldSpec("siliconflow_apikey", str, "", "硅基流动AI生图API密钥")) + self.config_specs["models"] = models_section + + # 发送说说配置节 + send_section = ConfigSectionSpec("send", "发送说说配置") + send_section.add_field(ConfigFieldSpec("permission", list, ["2488036428"], "发送权限QQ号列表")) + send_section.add_field(ConfigFieldSpec("permission_type", str, "whitelist", "权限类型", + choices=["whitelist", "blacklist"])) + send_section.add_field(ConfigFieldSpec("enable_image", bool, False, "是否启用说说配图")) + send_section.add_field(ConfigFieldSpec("enable_ai_image", bool, False, "是否启用AI生成配图")) + send_section.add_field(ConfigFieldSpec("enable_reply", bool, True, "生成完成时是否发出回复")) + send_section.add_field(ConfigFieldSpec("ai_image_number", int, 1, "AI生成图片数量", + min_value=1, max_value=4)) + send_section.add_field(ConfigFieldSpec("image_directory", str, "./plugins/built_in/Maizone/images", + "图片存储目录")) + self.config_specs["send"] = send_section + + # 阅读说说配置节 + read_section = ConfigSectionSpec("read", "阅读说说配置") + read_section.add_field(ConfigFieldSpec("permission", list, [], "阅读权限QQ号列表")) + read_section.add_field(ConfigFieldSpec("permission_type", str, "blacklist", "权限类型", + choices=["whitelist", "blacklist"])) + read_section.add_field(ConfigFieldSpec("read_number", int, 5, "一次读取的说说数量", + min_value=1, max_value=20)) + read_section.add_field(ConfigFieldSpec("like_possibility", float, 1.0, "点赞概率", + min_value=0.0, max_value=1.0)) + read_section.add_field(ConfigFieldSpec("comment_possibility", float, 0.3, "评论概率", + min_value=0.0, max_value=1.0)) + self.config_specs["read"] = read_section + + # 自动监控配置节 + monitor_section = ConfigSectionSpec("monitor", "自动监控配置") + monitor_section.add_field(ConfigFieldSpec("enable_auto_monitor", bool, False, "是否启用自动监控好友说说")) + monitor_section.add_field(ConfigFieldSpec("interval_minutes", int, 10, "监控间隔时间(分钟)", + min_value=1, max_value=1440)) + self.config_specs["monitor"] = monitor_section + + # 定时发送配置节 + schedule_section = ConfigSectionSpec("schedule", "定时发送配置") + schedule_section.add_field(ConfigFieldSpec("enable_schedule", bool, False, "是否启用定时发送说说")) + schedule_section.add_field(ConfigFieldSpec("schedules", list, [ + {"time": "08:00", "topic": "早安"}, + {"time": "22:00", "topic": "晚安"} + ], "定时发送任务列表")) + self.config_specs["schedule"] = schedule_section + + def load_config(self) -> bool: + """ + 加载配置文件 + + Returns: + bool: 是否成功加载 + """ + try: + # 如果配置文件不存在,生成默认配置 + if not self.config_file_path.exists(): + logger.info(f"配置文件不存在,生成默认配置: {self.config_file_path}") + self._generate_default_config() + + # 加载配置文件 + with open(self.config_file_path, 'r', encoding='utf-8') as f: + self.config_data = toml.load(f) + + logger.info(f"成功加载配置文件: {self.config_file_path}") + + # 验证配置 + self._validate_config() + + # 检查版本并迁移 + self._check_and_migrate_config() + + return True + + except Exception as e: + logger.error(f"加载配置文件失败: {e}") + return False + + def _generate_default_config(self): + """生成默认配置文件""" + try: + # 确保插件目录存在 + self.plugin_dir.mkdir(parents=True, exist_ok=True) + + # 生成默认配置数据 + default_config = {} + for section_name, section_spec in self.config_specs.items(): + section_data = {} + for field_name, field_spec in section_spec.fields.items(): + section_data[field_name] = field_spec.default + default_config[section_name] = section_data + + # 保存到文件 + self._save_config_to_file(default_config) + self.config_data = default_config + + logger.info(f"默认配置文件已生成: {self.config_file_path}") + + except Exception as e: + logger.error(f"生成默认配置文件失败: {e}") + + def _save_config_to_file(self, config_data: Dict[str, Any]): + """保存配置到文件(带注释)""" + toml_content = f"# MaiZone插件配置文件\n" + toml_content += f"# 让你的麦麦发QQ空间说说、评论、点赞,支持AI配图、定时发送和自动监控功能\n" + toml_content += f"# 配置版本: {self.config_version}\n\n" + + for section_name, section_spec in self.config_specs.items(): + if section_name not in config_data: + continue + + # 添加节描述 + toml_content += f"# {section_spec.description}\n" + toml_content += f"[{section_name}]\n\n" + + section_data = config_data[section_name] + for field_name, field_spec in section_spec.fields.items(): + if field_name not in section_data: + continue + + # 添加字段描述 + toml_content += f"# {field_spec.description}\n" + if field_spec.choices: + toml_content += f"# 可选值: {', '.join(map(str, field_spec.choices))}\n" + if field_spec.min_value is not None or field_spec.max_value is not None: + range_str = f"# 范围: " + if field_spec.min_value is not None: + range_str += f"最小值 {field_spec.min_value}" + if field_spec.max_value is not None: + if field_spec.min_value is not None: + range_str += f", 最大值 {field_spec.max_value}" + else: + range_str += f"最大值 {field_spec.max_value}" + toml_content += range_str + "\n" + + # 添加字段值 + value = section_data[field_name] + if isinstance(value, str): + toml_content += f'{field_name} = "{value}"\n' + elif isinstance(value, bool): + toml_content += f"{field_name} = {str(value).lower()}\n" + elif isinstance(value, list): + # 格式化列表 + if all(isinstance(item, str) for item in value): + formatted_list = "[" + ", ".join(f'"{item}"' for item in value) + "]" + elif all(isinstance(item, dict) for item in value): + # 处理字典列表(如schedules) + formatted_items = [] + for item in value: + formatted_items.append("{" + ", ".join(f'{k} = "{v}"' for k, v in item.items()) + "}") + formatted_list = "[\n " + ",\n ".join(formatted_items) + ",\n]" + else: + formatted_list = str(value) + toml_content += f"{field_name} = {formatted_list}\n" + else: + toml_content += f"{field_name} = {value}\n" + + toml_content += "\n" + + toml_content += "\n" + + # 写入文件 + with open(self.config_file_path, 'w', encoding='utf-8') as f: + f.write(toml_content) + + def _validate_config(self) -> bool: + """验证配置数据""" + all_valid = True + + for section_name, section_spec in self.config_specs.items(): + if section_name not in self.config_data: + logger.warning(f"配置文件缺少节: {section_name}") + continue + + section_data = self.config_data[section_name] + is_valid, errors = section_spec.validate_section(section_data) + + if not is_valid: + logger.error(f"配置节 {section_name} 验证失败:") + for error in errors: + logger.error(f" - {error}") + all_valid = False + + return all_valid + + def _check_and_migrate_config(self): + """检查配置版本并进行迁移""" + current_version = self.get_config("plugin.config_version", "1.0.0") + + if current_version != self.config_version: + logger.info(f"检测到配置版本变更: {current_version} -> {self.config_version}") + + # 备份旧配置 + self._backup_config() + + # 迁移配置 + self._migrate_config(current_version, self.config_version) + + # 更新版本号 + self.config_data["plugin"]["config_version"] = self.config_version + + # 保存迁移后的配置 + self._save_config_to_file(self.config_data) + + logger.info(f"配置已迁移到版本 {self.config_version}") + + def _backup_config(self): + """备份当前配置文件""" + if self.config_file_path.exists(): + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + backup_path = self.config_file_path.with_suffix(f".backup_{timestamp}.toml") + shutil.copy2(self.config_file_path, backup_path) + logger.info(f"配置文件已备份到: {backup_path}") + + def _migrate_config(self, from_version: str, to_version: str): + """迁移配置数据""" + # 创建新的配置结构 + new_config = {} + + for section_name, section_spec in self.config_specs.items(): + new_section = {} + + # 复制现有配置值 + if section_name in self.config_data: + old_section = self.config_data[section_name] + for field_name, field_spec in section_spec.fields.items(): + if field_name in old_section: + new_section[field_name] = old_section[field_name] + else: + new_section[field_name] = field_spec.default + logger.info(f"添加新配置项: {section_name}.{field_name} = {field_spec.default}") + else: + # 新增节,使用默认值 + for field_name, field_spec in section_spec.fields.items(): + new_section[field_name] = field_spec.default + logger.info(f"添加新配置节: {section_name}") + + new_config[section_name] = new_section + + self.config_data = new_config + + def get_config(self, key: str, default: Any = None) -> Any: + """ + 获取配置值,支持嵌套键访问 + + Args: + key: 配置键名,支持嵌套访问如 "section.field" + default: 默认值 + + Returns: + Any: 配置值或默认值 + """ + keys = key.split('.') + current = self.config_data + + for k in keys: + if isinstance(current, dict) and k in current: + current = current[k] + else: + return default + + return current + + def set_config(self, key: str, value: Any) -> bool: + """ + 设置配置值 + + Args: + key: 配置键名 + value: 配置值 + + Returns: + bool: 是否设置成功 + """ + try: + keys = key.split('.') + if len(keys) != 2: + logger.error(f"配置键格式错误: {key},应为 'section.field' 格式") + return False + + section_name, field_name = keys + + # 检查节是否存在 + if section_name not in self.config_specs: + logger.error(f"未知配置节: {section_name}") + return False + + # 检查字段是否存在 + if field_name not in self.config_specs[section_name].fields: + logger.error(f"未知配置字段: {key}") + return False + + # 验证值 + field_spec = self.config_specs[section_name].fields[field_name] + is_valid, error_msg = field_spec.validate_value(value) + if not is_valid: + logger.error(f"配置值验证失败 {key}: {error_msg}") + return False + + # 设置值 + if section_name not in self.config_data: + self.config_data[section_name] = {} + + self.config_data[section_name][field_name] = value + logger.debug(f"设置配置: {key} = {value}") + + return True + + except Exception as e: + logger.error(f"设置配置失败 {key}: {e}") + return False + + def save_config(self) -> bool: + """ + 保存当前配置到文件 + + Returns: + bool: 是否保存成功 + """ + try: + self._save_config_to_file(self.config_data) + logger.info(f"配置已保存到: {self.config_file_path}") + return True + except Exception as e: + logger.error(f"保存配置失败: {e}") + return False + + def reload_config(self) -> bool: + """ + 重新加载配置文件 + + Returns: + bool: 是否重新加载成功 + """ + return self.load_config() + + def get_config_info(self) -> Dict[str, Any]: + """ + 获取配置信息 + + Returns: + Dict[str, Any]: 配置信息 + """ + return { + "config_file": str(self.config_file_path), + "config_version": self.config_version, + "sections": list(self.config_specs.keys()), + "loaded": bool(self.config_data) + } diff --git a/src/plugins/built_in/Maizone/monitor.py b/src/plugins/built_in/Maizone/monitor.py new file mode 100644 index 000000000..0bcce47f2 --- /dev/null +++ b/src/plugins/built_in/Maizone/monitor.py @@ -0,0 +1,242 @@ +import asyncio +import random +import time +import traceback +from typing import List, Dict, Any + +from src.common.logger import get_logger +from src.plugin_system.apis import llm_api, config_api + +# 导入工具模块 +import sys +import os +sys.path.append(os.path.dirname(__file__)) + +from qzone_utils import QZoneManager + +# 获取日志记录器 +logger = get_logger('MaiZone-Monitor') + + +class MonitorManager: + """监控管理器 - 负责自动监控好友说说并点赞评论""" + + def __init__(self, plugin): + """初始化监控管理器""" + self.plugin = plugin + self.is_running = False + self.task = None + self.last_check_time = 0 + + logger.info("监控管理器初始化完成") + + async def start(self): + """启动监控任务""" + if self.is_running: + logger.warning("监控任务已在运行中") + return + + self.is_running = True + self.task = asyncio.create_task(self._monitor_loop()) + logger.info("说说监控任务已启动") + + async def stop(self): + """停止监控任务""" + if not self.is_running: + return + + self.is_running = False + + if self.task: + self.task.cancel() + try: + await self.task + except asyncio.CancelledError: + logger.info("监控任务已被取消") + + logger.info("说说监控任务已停止") + + async def _monitor_loop(self): + """监控任务主循环""" + while self.is_running: + try: + # 获取监控间隔配置 + interval_minutes = int(self.plugin.get_config("monitor.interval_minutes", 10) or 10) + + # 等待指定时间间隔 + await asyncio.sleep(interval_minutes * 60) + + # 执行监控检查 + await self._check_and_process_feeds() + + except asyncio.CancelledError: + logger.info("监控循环被取消") + break + except Exception as e: + logger.error(f"监控任务出错: {str(e)}") + logger.error(traceback.format_exc()) + # 出错后等待5分钟再重试 + await asyncio.sleep(300) + + async def _check_and_process_feeds(self): + """检查并处理好友说说""" + try: + # 获取配置 + qq_account = config_api.get_global_config("bot.qq_account", "") + port = str(self.plugin.get_config("plugin.http_port", "3000")) + host = str(self.plugin.get_config("plugin.http_host", "127.0.0.1")) + read_num = 10 # 监控时读取较少的说说数量 + + logger.info("监控任务: 开始检查好友说说") + + # 创建QZone管理器 + qzone_manager = QZoneManager(port, host) + + # 获取监控说说列表 + feeds_list = await qzone_manager.monitor_read_feed(qq_account, read_num) + + if not feeds_list: + logger.info("监控任务: 未发现新说说") + return + + logger.info(f"监控任务: 发现 {len(feeds_list)} 条新说说") + + # 处理每条说说 + for feed in feeds_list: + try: + await self._process_monitor_feed(feed, qzone_manager) + # 每条说说之间随机延迟 + await asyncio.sleep(3 + random.random() * 2) + except Exception as e: + logger.error(f"处理监控说说失败: {str(e)}") + + except Exception as e: + logger.error(f"监控检查失败: {str(e)}") + + async def _process_monitor_feed(self, feed: Dict[str, Any], qzone_manager: QZoneManager): + """处理单条监控说说""" + try: + # 提取说说信息 + target_qq = feed.get("target_qq", "") + tid = feed.get("tid", "") + content = feed.get("content", "") + images = feed.get("images", []) + rt_con = feed.get("rt_con", "") + + # 构建完整内容用于显示 + full_content = content + if images: + full_content += f" [图片: {len(images)}张]" + if rt_con: + full_content += f" [转发: {rt_con[:20]}...]" + + logger.info(f"监控处理说说: {target_qq} - {full_content[:30]}...") + + # 获取配置 + qq_account = config_api.get_global_config("bot.qq_account", "") + like_possibility = float(self.plugin.get_config("read.like_possibility", 1.0) or 1.0) + comment_possibility = float(self.plugin.get_config("read.comment_possibility", 0.3) or 0.3) + + # 随机决定是否评论 + if random.random() <= comment_possibility: + comment = await self._generate_monitor_comment(content, rt_con, target_qq) + if comment: + success = await qzone_manager.comment_feed(qq_account, target_qq, tid, comment) + if success: + logger.info(f"监控评论成功: '{comment}'") + else: + logger.error(f"监控评论失败: {content[:20]}...") + + # 随机决定是否点赞 + if random.random() <= like_possibility: + success = await qzone_manager.like_feed(qq_account, target_qq, tid) + if success: + logger.info(f"监控点赞成功: {content[:20]}...") + else: + logger.error(f"监控点赞失败: {content[:20]}...") + + except Exception as e: + logger.error(f"处理监控说说异常: {str(e)}") + + async def _generate_monitor_comment(self, content: str, rt_con: str, target_qq: str) -> str: + """生成监控评论内容""" + try: + # 获取模型配置 + models = llm_api.get_available_models() + text_model = str(self.plugin.get_config("models.text_model", "replyer_1")) + model_config = models.get(text_model) + + if not model_config: + logger.error("未配置LLM模型") + return "" + + # 获取机器人信息 + bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人") + bot_expression = config_api.get_global_config("expression.expression_style", "内容积极向上") + + # 构建提示词 + if not rt_con: + prompt = f""" + 你是'{bot_personality}',你正在浏览你好友'{target_qq}'的QQ空间, + 你看到了你的好友'{target_qq}'qq空间上内容是'{content}'的说说,你想要发表你的一条评论, + {bot_expression},回复的平淡一些,简短一些,说中文, + 不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,不要输出多余内容(包括前后缀,冒号和引号,括号(),表情包,at或 @等 )。只输出回复内容 + """ + else: + prompt = f""" + 你是'{bot_personality}',你正在浏览你好友'{target_qq}'的QQ空间, + 你看到了你的好友'{target_qq}'在qq空间上转发了一条内容为'{rt_con}'的说说,你的好友的评论为'{content}' + 你想要发表你的一条评论,{bot_expression},回复的平淡一些,简短一些,说中文, + 不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,不要输出多余内容(包括前后缀,冒号和引号,括号(),表情包,at或 @等 )。只输出回复内容 + """ + + logger.info(f"正在为 {target_qq} 的说说生成评论...") + + # 生成评论 + success, comment, reasoning, model_name = await llm_api.generate_with_model( + prompt=prompt, + model_config=model_config, + request_type="story.generate", + temperature=0.3, + max_tokens=1000 + ) + + if success: + logger.info(f"成功生成监控评论: '{comment}'") + return comment + else: + logger.error("生成监控评论失败") + return "" + + except Exception as e: + logger.error(f"生成监控评论异常: {str(e)}") + return "" + + def get_status(self) -> Dict[str, Any]: + """获取监控状态""" + return { + "is_running": self.is_running, + "interval_minutes": self.plugin.get_config("monitor.interval_minutes", 10), + "last_check_time": self.last_check_time, + "enabled": self.plugin.get_config("monitor.enable_auto_monitor", False) + } + + async def manual_check(self) -> Dict[str, Any]: + """手动执行一次监控检查""" + try: + logger.info("执行手动监控检查") + await self._check_and_process_feeds() + + return { + "success": True, + "message": "手动监控检查完成", + "timestamp": time.time() + } + + except Exception as e: + logger.error(f"手动监控检查失败: {str(e)}") + return { + "success": False, + "message": f"手动监控检查失败: {str(e)}", + "timestamp": time.time() + } diff --git a/src/plugins/built_in/Maizone/plugin.py b/src/plugins/built_in/Maizone/plugin.py new file mode 100644 index 000000000..3de352d98 --- /dev/null +++ b/src/plugins/built_in/Maizone/plugin.py @@ -0,0 +1,829 @@ +import asyncio +import random +import time +import traceback +from typing import List, Tuple, Type, Union, Any, Optional + +from src.common.logger import get_logger +from src.plugin_system import ( + BasePlugin, register_plugin, BaseAction, BaseCommand, + ComponentInfo, ActionActivationType, ChatMode +) +from src.plugin_system.apis import llm_api, config_api, person_api, generator_api +from src.plugin_system.base.config_types import ConfigField + +# 导入插件工具模块 +import sys +import os +sys.path.append(os.path.dirname(__file__)) + +from qzone_utils import ( + QZoneManager, generate_image_by_sf, get_send_history +) +from scheduler import ScheduleManager +from config_loader import MaiZoneConfigLoader + +# 获取日志记录器 +logger = get_logger('MaiZone') + + +# ===== 发送说说命令组件 ===== +class SendFeedCommand(BaseCommand): + """发送说说命令 - 响应 /send_feed 命令""" + + command_name = "send_feed" + command_description = "发送一条QQ空间说说" + command_pattern = r"^/send_feed(?:\s+(?P\w+))?$" + command_help = "发一条主题为或随机的说说" + command_examples = ["/send_feed", "/send_feed 日常"] + intercept_message = True + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # 获取配置加载器引用 + self.config_loader = None + self._init_config_loader() + + def _init_config_loader(self): + """初始化配置加载器""" + try: + plugin_dir = os.path.dirname(__file__) + self.config_loader = MaiZoneConfigLoader(plugin_dir) + self.config_loader.load_config() + except Exception as e: + logger.error(f"初始化配置加载器失败: {e}") + + def get_config(self, key: str, default=None): + """获取配置值""" + if self.config_loader: + return self.config_loader.get_config(key, default) + return default + + def check_permission(self, qq_account: str) -> bool: + """检查用户权限""" + + permission_list = self.get_config("send.permission", []) + permission_type = self.get_config("send.permission_type", "whitelist") + + logger.info(f'权限检查: {permission_type}:{permission_list}') + + if not isinstance(permission_list, list): + logger.error("权限列表配置错误") + return False + + if permission_type == 'whitelist': + return qq_account in permission_list + elif permission_type == 'blacklist': + return qq_account not in permission_list + else: + logger.error('权限类型配置错误,应为 whitelist 或 blacklist') + return False + + async def execute(self) -> Tuple[bool, str, bool]: + """执行发送说说命令""" + try: + # 获取用户信息 + user_id = self.message.message_info.user_info.user_id if self.message and self.message.message_info and self.message.message_info.user_info else None + + # 权限检查 + if not user_id or not self.check_permission(user_id): + logger.info(f"用户 {user_id} 权限不足") + await self.send_text(f"权限不足,无法使用此命令") + return False, "权限不足", True + + # 获取主题 + topic = self.matched_groups.get("topic", "") + + # 生成说说内容 + story = await self._generate_story_content(topic) + if not story: + return False, "生成说说内容失败", True + + # 处理图片 + await self._handle_images(story) + + # 发送说说 + success = await self._send_feed(story) + if success: + if self.get_config("send.enable_reply", True): + await self.send_text(f"已发送说说:\n{story}") + return True, "发送成功", True + else: + return False, "发送说说失败", True + + except Exception as e: + logger.error(f"发送说说命令执行失败: {str(e)}") + return False, "命令执行失败", True + + async def _generate_story_content(self, topic: str) -> str: + """生成说说内容""" + try: + # 获取模型配置 + models = llm_api.get_available_models() + text_model = str(self.get_config("models.text_model", "replyer_1")) + model_config = models.get(text_model) + + if not model_config: + logger.error("未配置LLM模型") + return "" + + # 获取机器人信息 + bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人") + bot_expression = config_api.get_global_config("personality.reply_style", "内容积极向上") + qq_account = config_api.get_global_config("bot.qq_account", "") + + # 构建提示词 + if topic: + prompt = f""" + 你是'{bot_personality}',你想写一条主题是'{topic}'的说说发表在qq空间上, + {bot_expression} + 不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字, + 只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出 + """ + else: + prompt = f""" + 你是'{bot_personality}',你想写一条说说发表在qq空间上,主题不限 + {bot_expression} + 不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字, + 只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出 + """ + + # 添加历史记录 + prompt += "\n以下是你以前发过的说说,写新说说时注意不要在相隔不长的时间发送相同主题的说说" + history_block = await get_send_history(qq_account) + if history_block: + prompt += history_block + + # 生成内容 + success, story, reasoning, model_name = await llm_api.generate_with_model( + prompt=prompt, + model_config=model_config, + request_type="story.generate", + temperature=0.3, + max_tokens=1000 + ) + + if success: + logger.info(f"成功生成说说内容:'{story}'") + return story + else: + logger.error("生成说说内容失败") + return "" + + except Exception as e: + logger.error(f"生成说说内容异常: {str(e)}") + return "" + + async def _handle_images(self, story: str): + """处理说说配图""" + try: + enable_ai_image = bool(self.get_config("send.enable_ai_image", False)) + apikey = str(self.get_config("models.siliconflow_apikey", "")) + image_dir = str(self.get_config("send.image_directory", "./plugins/Maizone/images")) + image_num_raw = self.get_config("send.ai_image_number", 1) + image_num = int(image_num_raw if image_num_raw is not None else 1) + + if enable_ai_image and apikey: + await generate_image_by_sf( + api_key=apikey, + story=story, + image_dir=image_dir, + batch_size=image_num + ) + elif enable_ai_image and not apikey: + logger.error('启用了AI配图但未填写API密钥') + + except Exception as e: + logger.error(f"处理配图失败: {str(e)}") + + async def _send_feed(self, story: str) -> bool: + """发送说说到QQ空间""" + try: + # 获取配置 + port = str(self.get_config("plugin.http_port", "3000")) + host = str(self.get_config("plugin.http_host", "127.0.0.1")) + qq_account = config_api.get_global_config("bot.qq_account", "") + enable_image = bool(self.get_config("send.enable_image", False)) + image_dir = str(self.get_config("send.image_directory", "./plugins/Maizone/images")) + + # 创建QZone管理器并发送 + qzone_manager = QZoneManager(port, host) + success = await qzone_manager.send_feed(story, image_dir, qq_account, enable_image) + + return success + + except Exception as e: + logger.error(f"发送说说失败: {str(e)}") + return False + + +# ===== 发送说说动作组件 ===== +class SendFeedAction(BaseAction): + """发送说说动作 - 当用户要求发说说时激活""" + + action_name = "send_feed" + action_description = "发一条相应主题的说说" + activation_type = ActionActivationType.KEYWORD + mode_enable = ChatMode.ALL + + activation_keywords = ["说说", "空间", "动态"] + keyword_case_sensitive = False + + action_parameters = { + "topic": "要发送的说说主题", + "user_name": "要求你发说说的好友的qq名称", + } + action_require = [ + "用户要求发说说时使用", + "当有人希望你更新qq空间时使用", + "当你认为适合发说说时使用", + ] + associated_types = ["text"] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # 获取配置加载器引用 + self.config_loader = None + self._init_config_loader() + + def _init_config_loader(self): + """初始化配置加载器""" + try: + plugin_dir = os.path.dirname(__file__) + self.config_loader = MaiZoneConfigLoader(plugin_dir) + self.config_loader.load_config() + except Exception as e: + logger.error(f"初始化配置加载器失败: {e}") + + def get_config(self, key: str, default=None): + """获取配置值""" + if self.config_loader: + return self.config_loader.get_config(key, default) + return default + + def check_permission(self, qq_account: str) -> bool: + """检查用户权限""" + permission_list = self.get_config("send.permission", []) + permission_type = self.get_config("send.permission_type", "whitelist") + + logger.info(f'权限检查: {permission_type}:{permission_list}') + + if isinstance(permission_list, list): + if permission_type == 'whitelist': + return qq_account in permission_list + elif permission_type == 'blacklist': + return qq_account not in permission_list + + logger.error('权限类型配置错误') + return False + + async def execute(self) -> Tuple[bool, str]: + if self.plugin_dir is None: + self.plugin_dir = os.path.dirname(__file__) + """执行发送说说动作""" + try: + # 获取用户信息 + user_name = self.action_data.get("user_name", "") + person_id = person_api.get_person_id_by_name(user_name) + user_id = await person_api.get_person_value(person_id, "user_id") + + # 权限检查 + if not self.check_permission(user_id): + logger.info(f"用户 {user_id} 权限不足") + success, reply_set, _ = await generator_api.generate_reply( + chat_stream=self.chat_stream, + action_data={"extra_info_block": f'{user_name}无权命令你发送说说,请用符合你人格特点的方式拒绝请求'} + ) + if success and reply_set: + for reply_type, reply_content in reply_set: + if reply_type == "text": + await self.send_text(reply_content) + return False, "权限不足" + + # 获取主题并生成内容 + topic = self.action_data.get("topic", "") + story = await self._generate_story_content(topic) + if not story: + return False, "生成说说内容失败" + + # 处理图片 + await self._handle_images(story) + + # 发送说说 + success = await self._send_feed(story) + if success: + logger.info(f"成功发送说说: {story}") + + # 生成回复 + success, reply_set, _ = await generator_api.generate_reply( + chat_stream=self.chat_stream, + action_data={"extra_info_block": f'你刚刚发了一条说说,内容为{story}'} + ) + + if success and reply_set: + for reply_type, reply_content in reply_set: + if reply_type == "text": + await self.send_text(reply_content) + return True, '发送成功' + else: + await self.send_text('我发了一条说说啦~') + return True, '发送成功但回复生成失败' + else: + return False, "发送说说失败" + + except Exception as e: + logger.error(f"发送说说动作执行失败: {str(e)}") + return False, "动作执行失败" + + async def _generate_story_content(self, topic: str) -> str: + """生成说说内容""" + try: + # 获取模型配置 + models = llm_api.get_available_models() + text_model = str(self.get_config("models.text_model", "replyer_1")) + model_config = models.get(text_model) + + if not model_config: + return "" + + # 获取机器人信息 + bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人") + bot_expression = config_api.get_global_config("expression.expression_style", "内容积极向上") + qq_account = config_api.get_global_config("bot.qq_account", "") + + # 构建提示词 + prompt = f""" + 你是{bot_personality},你想写一条主题是{topic}的说说发表在qq空间上, + {bot_expression} + 不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字, + 只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出 + """ + + # 添加历史记录 + prompt += "\n以下是你以前发过的说说,写新说说时注意不要在相隔不长的时间发送相同主题的说说" + history_block = await get_send_history(qq_account) + if history_block: + prompt += history_block + + # 生成内容 + success, story, reasoning, model_name = await llm_api.generate_with_model( + prompt=prompt, + model_config=model_config, + request_type="story.generate", + temperature=0.3, + max_tokens=1000 + ) + + if success: + return story + else: + return "" + + except Exception as e: + logger.error(f"生成说说内容异常: {str(e)}") + return "" + + async def _handle_images(self, story: str): + """处理说说配图""" + try: + enable_ai_image = bool(self.get_config("send.enable_ai_image", False)) + apikey = str(self.get_config("models.siliconflow_apikey", "")) + image_dir = str(self.get_config("send.image_directory", "./plugins/Maizone/images")) + image_num_raw = self.get_config("send.ai_image_number", 1) + image_num = int(image_num_raw if image_num_raw is not None else 1) + + if enable_ai_image and apikey: + await generate_image_by_sf( + api_key=apikey, + story=story, + image_dir=image_dir, + batch_size=image_num + ) + elif enable_ai_image and not apikey: + logger.error('启用了AI配图但未填写API密钥') + + except Exception as e: + logger.error(f"处理配图失败: {str(e)}") + + async def _send_feed(self, story: str) -> bool: + """发送说说到QQ空间""" + try: + # 获取配置 + port = str(self.get_config("plugin.http_port", "3000")) + host = str(self.get_config("plugin.http_host", "127.0.0.1")) + qq_account = config_api.get_global_config("bot.qq_account", "") + enable_image = bool(self.get_config("send.enable_image", False)) + image_dir = str(self.get_config("send.image_directory", "./plugins/Maizone/images")) + + # 创建QZone管理器并发送 + qzone_manager = QZoneManager(port, host) + success = await qzone_manager.send_feed(story, image_dir, qq_account, enable_image) + + return success + + except Exception as e: + logger.error(f"发送说说失败: {str(e)}") + return False + + +# ===== 阅读说说动作组件 ===== +class ReadFeedAction(BaseAction): + """阅读说说动作 - 当用户要求读说说时激活""" + + action_name = "read_feed" + action_description = "读取好友最近的动态/说说/qq空间并评论点赞" + activation_type = ActionActivationType.KEYWORD + mode_enable = ChatMode.ALL + + activation_keywords = ["说说", "空间", "动态"] + keyword_case_sensitive = False + + action_parameters = { + "target_name": "需要阅读动态的好友的qq名称", + "user_name": "要求你阅读动态的好友的qq名称" + } + + action_require = [ + "需要阅读某人动态、说说、QQ空间时使用", + "当有人希望你评价某人的动态、说说、QQ空间", + "当你认为适合阅读说说、动态、QQ空间时使用", + ] + associated_types = ["text"] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # 获取配置加载器引用 + self.config_loader = None + self._init_config_loader() + + def _init_config_loader(self): + """初始化配置加载器""" + try: + plugin_dir = os.path.dirname(__file__) + self.config_loader = MaiZoneConfigLoader(plugin_dir) + self.config_loader.load_config() + except Exception as e: + logger.error(f"初始化配置加载器失败: {e}") + + def get_config(self, key: str, default=None): + """获取配置值""" + if self.config_loader: + return self.config_loader.get_config(key, default) + return default + + def check_permission(self, qq_account: str) -> bool: + """检查用户权限""" + permission_list = self.get_config("read.permission", []) + permission_type = self.get_config("read.permission_type", "blacklist") + + if not isinstance(permission_list, list): + return False + + logger.info(f'权限检查: {permission_type}:{permission_list}') + + if permission_type == 'whitelist': + return qq_account in permission_list + elif permission_type == 'blacklist': + return qq_account not in permission_list + else: + logger.error('权限类型配置错误') + return False + + async def execute(self) -> Tuple[bool, str]: + """执行阅读说说动作""" + try: + # 获取用户信息 + user_name = self.action_data.get("user_name", "") + person_id = person_api.get_person_id_by_name(user_name) + user_id = await person_api.get_person_value(person_id, "user_id") + + # 权限检查 + if not self.check_permission(user_id): + logger.info(f"用户 {user_id} 权限不足") + success, reply_set, _ = await generator_api.generate_reply( + chat_stream=self.chat_stream, + action_data={"extra_info_block": f'{user_name}无权命令你阅读说说,请用符合人格的方式进行拒绝的回复'} + ) + if success and reply_set: + for reply_type, reply_content in reply_set: + if reply_type == "text": + await self.send_text(reply_content) + return False, "权限不足" + + # 获取目标用户 + target_name = self.action_data.get("target_name", "") + target_person_id = person_api.get_person_id_by_name(target_name) + target_qq = await person_api.get_person_value(target_person_id, "user_id") + + # 读取并处理说说 + success = await self._read_and_process_feeds(target_qq, target_name) + + if success: + # 生成回复 + success, reply_set, _ = await generator_api.generate_reply( + chat_stream=self.chat_stream, + action_data={"extra_info_block": f'你刚刚成功读了{target_name}的说说,请告知你已经读了说说'} + ) + + if success and reply_set: + for reply_type, reply_content in reply_set: + if reply_type == "text": + await self.send_text(reply_content) + return True, '阅读成功' + return True, '阅读成功但回复生成失败' + else: + return False, "阅读说说失败" + + except Exception as e: + logger.error(f"阅读说说动作执行失败: {str(e)}") + return False, "动作执行失败" + + async def _read_and_process_feeds(self, target_qq: str, target_name: str) -> bool: + """读取并处理说说""" + try: + # 获取配置 + port = str(self.get_config("plugin.http_port", "3000")) + host = str(self.get_config("plugin.http_host", "127.0.0.1")) + qq_account = config_api.get_global_config("bot.qq_account", "") + num_raw = self.get_config("read.read_number", 5) + num = int(num_raw if num_raw is not None else 5) + like_raw = self.get_config("read.like_possibility", 1.0) + like_possibility = float(like_raw if like_raw is not None else 1.0) + comment_raw = self.get_config("read.comment_possibility", 1.0) + comment_possibility = float(comment_raw if comment_raw is not None else 1.0) + + # 创建QZone管理器并读取说说 + qzone_manager = QZoneManager(port, host) + feeds_list = await qzone_manager.read_feed(qq_account, target_qq, num) + + # 处理错误情况 + if isinstance(feeds_list, list) and len(feeds_list) > 0 and isinstance(feeds_list[0], dict) and 'error' in feeds_list[0]: + success, reply_set, _ = await generator_api.generate_reply( + chat_stream=self.chat_stream, + action_data={"extra_info_block": f'你在读取说说的时候出现了错误,错误原因:{feeds_list[0].get("error")}'} + ) + + if success and reply_set: + for reply_type, reply_content in reply_set: + if reply_type == "text": + await self.send_text(reply_content) + return True + + # 处理说说列表 + if isinstance(feeds_list, list): + logger.info(f"成功读取到{len(feeds_list)}条说说") + + for feed in feeds_list: + # 随机延迟 + time.sleep(3 + random.random()) + + # 处理说说内容 + await self._process_single_feed( + feed, target_qq, target_name, + like_possibility, comment_possibility, qzone_manager + ) + + return True + else: + return False + + except Exception as e: + logger.error(f"读取并处理说说失败: {str(e)}") + return False + + async def _process_single_feed(self, feed: dict, target_qq: str, target_name: str, + like_possibility: float, comment_possibility: float, + qzone_manager): + """处理单条说说""" + try: + content = feed.get("content", "") + images = feed.get("images", []) + if images: + for image in images: + content = content + str(image) + fid = feed.get("tid", "") + rt_con = feed.get("rt_con", "") + + # 随机评论 + if random.random() <= comment_possibility: + comment = await self._generate_comment(content, rt_con, target_name) + if comment: + success = await qzone_manager.comment_feed( + config_api.get_global_config("bot.qq_account", ""), + target_qq, fid, comment + ) + if success: + logger.info(f"发送评论'{comment}'成功") + else: + logger.error(f"评论说说'{content[:20]}...'失败") + + # 随机点赞 + if random.random() <= like_possibility: + success = await qzone_manager.like_feed( + config_api.get_global_config("bot.qq_account", ""), + target_qq, fid + ) + if success: + logger.info(f"点赞说说'{content[:10]}..'成功") + else: + logger.error(f"点赞说说'{content[:20]}...'失败") + + except Exception as e: + logger.error(f"处理单条说说失败: {str(e)}") + + async def _generate_comment(self, content: str, rt_con: str, target_name: str) -> str: + """生成评论内容""" + try: + # 获取模型配置 + models = llm_api.get_available_models() + text_model = str(self.get_config("models.text_model", "replyer_1")) + model_config = models.get(text_model) + + if not model_config: + return "" + + # 获取机器人信息 + bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人") + bot_expression = config_api.get_global_config("expression.expression_style", "内容积极向上") + + # 构建提示词 + if not rt_con: + prompt = f""" + 你是'{bot_personality}',你正在浏览你好友'{target_name}'的QQ空间, + 你看到了你的好友'{target_name}'qq空间上内容是'{content}'的说说,你想要发表你的一条评论, + {bot_expression},回复的平淡一些,简短一些,说中文, + 不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,不要输出多余内容(包括前后缀,冒号和引号,括号(),表情包,at或 @等 )。只输出回复内容 + """ + else: + prompt = f""" + 你是'{bot_personality}',你正在浏览你好友'{target_name}'的QQ空间, + 你看到了你的好友'{target_name}'在qq空间上转发了一条内容为'{rt_con}'的说说,你的好友的评论为'{content}' + 你想要发表你的一条评论,{bot_expression},回复的平淡一些,简短一些,说中文, + 不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,不要输出多余内容(包括前后缀,冒号和引号,括号(),表情包,at或 @等 )。只输出回复内容 + """ + + logger.info(f"正在评论'{target_name}'的说说:{content[:20]}...") + + # 生成评论 + success, comment, reasoning, model_name = await llm_api.generate_with_model( + prompt=prompt, + model_config=model_config, + request_type="story.generate", + temperature=0.3, + max_tokens=1000 + ) + + if success: + logger.info(f"成功生成评论内容:'{comment}'") + return comment + else: + logger.error("生成评论内容失败") + return "" + + except Exception as e: + logger.error(f"生成评论内容异常: {str(e)}") + return "" + + +# ===== 插件主类 ===== +@register_plugin +class MaiZonePlugin(BasePlugin): + """MaiZone插件 - 让麦麦发QQ空间""" + + # 插件基本信息 + plugin_name: str = "MaiZonePlugin" + enable_plugin: bool = True + dependencies: List[str] = [] + python_dependencies: List[str] = [] + config_file_name: str = "config.toml" + + # 配置节描述 + config_section_descriptions = { + "plugin": "插件基础配置", + "models": "模型相关配置", + "send": "发送说说配置", + "read": "阅读说说配置", + "monitor": "自动监控配置", + "schedule": "定时发送配置", + } + + # 配置模式定义 + config_schema: dict = { + "plugin": { + "enable": ConfigField(type=bool, default=True, description="是否启用插件"), + "config_version": ConfigField(type=str, default="2.0.0", description="配置文件版本"), + "http_port": ConfigField(type=str, default='3000', description="NapCat HTTP服务器端口号"), + "http_host": ConfigField(type=str, default='127.0.0.1', description="NapCat HTTP服务器地址"), + }, + "models": { + "text_model": ConfigField(type=str, default="replyer_1", description="生成文本的模型名称"), + "siliconflow_apikey": ConfigField(type=str, default="", description="硅基流动AI生图API密钥"), + }, + "send": { + "permission": ConfigField(type=list, default=['2488036428'], description="发送权限QQ号列表"), + "permission_type": ConfigField(type=str, default='whitelist', description="权限类型:whitelist(白名单) 或 blacklist(黑名单)"), + "enable_image": ConfigField(type=bool, default=False, description="是否启用说说配图"), + "enable_ai_image": ConfigField(type=bool, default=False, description="是否启用AI生成配图"), + "enable_reply": ConfigField(type=bool, default=True, description="生成完成时是否发出回复"), + "ai_image_number": ConfigField(type=int, default=1, description="AI生成图片数量(1-4张)"), + "image_directory": ConfigField(type=str, default="./plugins/built_in/Maizone/images", description="图片存储目录") + }, + "read": { + "permission": ConfigField(type=list, default=[], description="阅读权限QQ号列表"), + "permission_type": ConfigField(type=str, default='blacklist', description="权限类型:whitelist(白名单) 或 blacklist(黑名单)"), + "read_number": ConfigField(type=int, default=5, description="一次读取的说说数量"), + "like_possibility": ConfigField(type=float, default=1.0, description="点赞概率(0.0-1.0)"), + "comment_possibility": ConfigField(type=float, default=0.3, description="评论概率(0.0-1.0)"), + }, + "monitor": { + "enable_auto_monitor": ConfigField(type=bool, default=False, description="是否启用自动监控好友说说"), + "interval_minutes": ConfigField(type=int, default=10, description="监控间隔时间(分钟)"), + }, + "schedule": { + "enable_schedule": ConfigField(type=bool, default=False, description="是否启用定时发送说说"), + "schedules": ConfigField( + type=list, + default=[ + {"time": "08:00", "topic": "早安"}, + {"time": "22:00", "topic": "晚安"} + ], + description="定时发送任务列表" + ), + }, + } + + def __init__(self, *args, **kwargs): + """初始化插件""" + super().__init__(*args, **kwargs) + + # 设置插件信息 + self.plugin_name = "MaiZone" + self.plugin_description = "让麦麦实现QQ空间点赞、评论、发说说功能" + self.plugin_version = "2.0.0" + self.plugin_author = "重构版" + self.config_file_name = "config.toml" + + # 初始化独立配置加载器 + plugin_dir = self.plugin_dir + if plugin_dir is None: + plugin_dir = os.path.dirname(__file__) + self.config_loader = MaiZoneConfigLoader(plugin_dir, self.config_file_name) + + # 加载配置 + if not self.config_loader.load_config(): + logger.error("配置加载失败,使用默认设置") + + # 获取启用状态 + self.enable_plugin = self.config_loader.get_config("plugin.enable", True) + + # 初始化管理器 + self.monitor_manager = None + self.schedule_manager = None + + # 根据配置启动功能 + if self.enable_plugin: + self._init_managers() + + def _init_managers(self): + """初始化管理器""" + try: + # 初始化监控管理器 + if self.config_loader.get_config("monitor.enable_auto_monitor", False): + from .monitor import MonitorManager + self.monitor_manager = MonitorManager(self) + asyncio.create_task(self._start_monitor_delayed()) + + # 初始化定时管理器 + if self.config_loader.get_config("schedule.enable_schedule", False): + logger.info(f"定时任务启用状态: true") + self.schedule_manager = ScheduleManager(self) + asyncio.create_task(self._start_scheduler_delayed()) + + except Exception as e: + logger.error(f"初始化管理器失败: {str(e)}") + + async def _start_monitor_delayed(self): + """延迟启动监控管理器""" + try: + await asyncio.sleep(10) # 等待插件完全初始化 + if self.monitor_manager: + await self.monitor_manager.start() + except Exception as e: + logger.error(f"启动监控管理器失败: {str(e)}") + + async def _start_scheduler_delayed(self): + """延迟启动定时管理器""" + try: + await asyncio.sleep(10) # 等待插件完全初始化 + if self.schedule_manager: + await self.schedule_manager.start() + except Exception as e: + logger.error(f"启动定时管理器失败: {str(e)}") + + def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: + """获取插件组件列表""" + return [ + (SendFeedAction.get_action_info(), SendFeedAction), + (ReadFeedAction.get_action_info(), ReadFeedAction), + (SendFeedCommand.get_command_info(), SendFeedCommand) + ] diff --git a/src/plugins/built_in/Maizone/qzone_utils.py b/src/plugins/built_in/Maizone/qzone_utils.py new file mode 100644 index 000000000..67efab304 --- /dev/null +++ b/src/plugins/built_in/Maizone/qzone_utils.py @@ -0,0 +1,1026 @@ +import base64 +import json +import os +import random +import time +import datetime +import traceback +from typing import List, Dict, Any, Optional +from pathlib import Path + +import httpx +import requests +import asyncio +import bs4 +import json5 + +from src.chat.utils.utils_image import get_image_manager +from src.common.logger import get_logger +from src.plugin_system.apis import llm_api, config_api, emoji_api + +# 获取日志记录器 +logger = get_logger('MaiZone-Utils') + + +class CookieManager: + """Cookie管理类 - 负责处理QQ空间的认证Cookie""" + + @staticmethod + def get_cookie_file_path(uin: str) -> str: + """获取Cookie文件路径""" + return os.path.join(os.getcwd(), 'plugins/Maizone/', f"cookies-{uin}.json") + + @staticmethod + def parse_cookie_string(cookie_str: str) -> Dict[str, str]: + """解析Cookie字符串为字典""" + cookies: Dict[str, str] = {} + if not cookie_str: + return cookies + + for pair in cookie_str.split("; "): + if not pair or "=" not in pair: + continue + key, value = pair.split("=", 1) + cookies[key.strip()] = value.strip() + return cookies + + @staticmethod + def extract_uin_from_cookie(cookie_str: str) -> str: + """从Cookie中提取用户UIN""" + for item in cookie_str.split("; "): + if item.startswith("uin=") or item.startswith("o_uin="): + _, value = item.split("=", 1) + return value.lstrip("o") + raise ValueError("无法从Cookie字符串中提取UIN") + + @staticmethod + async def fetch_cookies(domain: str, port: str, host: str) -> Dict[str, Any]: + """从NapCat获取Cookie""" + url = f"http://{host}:{port}/get_cookies?domain={domain}" + try: + async with httpx.AsyncClient(timeout=10.0, trust_env=False) as client: + resp = await client.get(url) + resp.raise_for_status() + data = resp.json() + + if data.get("status") != "ok" or "cookies" not in data.get("data", {}): + raise RuntimeError(f"获取Cookie失败: {data}") + + return data["data"] + except Exception as e: + logger.error(f"从NapCat获取Cookie失败: {str(e)}") + raise + + @staticmethod + async def renew_cookies(port: str, host: str) -> bool: + """更新Cookie文件""" + try: + domain = "user.qzone.qq.com" + cookie_data = await CookieManager.fetch_cookies(domain, port, host) + cookie_str = cookie_data["cookies"] + parsed_cookies = CookieManager.parse_cookie_string(cookie_str) + uin = CookieManager.extract_uin_from_cookie(cookie_str) + + file_path = CookieManager.get_cookie_file_path(uin) + + with open(file_path, "w", encoding="utf-8") as f: + json.dump(parsed_cookies, f, indent=4, ensure_ascii=False) + + logger.info(f"Cookie已更新并保存至: {file_path}") + return True + + except Exception as e: + logger.error(f"更新Cookie失败: {str(e)}") + return False + + @staticmethod + def load_cookies(qq_account: str) -> Optional[Dict[str, str]]: + """加载Cookie文件""" + cookie_file = CookieManager.get_cookie_file_path(qq_account) + + if os.path.exists(cookie_file): + try: + with open(cookie_file, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + logger.error(f"加载Cookie文件失败: {str(e)}") + return None + else: + logger.warning(f"Cookie文件不存在: {cookie_file}") + return None + + +class QZoneAPI: + """QQ空间API类 - 封装QQ空间的核心操作""" + + # QQ空间API地址常量 + UPLOAD_IMAGE_URL = "https://up.qzone.qq.com/cgi-bin/upload/cgi_upload_image" + EMOTION_PUBLISH_URL = "https://user.qzone.qq.com/proxy/domain/taotao.qzone.qq.com/cgi-bin/emotion_cgi_publish_v6" + DOLIKE_URL = "https://user.qzone.qq.com/proxy/domain/w.qzone.qq.com/cgi-bin/likes/internal_dolike_app" + COMMENT_URL = "https://user.qzone.qq.com/proxy/domain/taotao.qzone.qq.com/cgi-bin/emotion_cgi_re_feeds" + LIST_URL = "https://user.qzone.qq.com/proxy/domain/taotao.qq.com/cgi-bin/emotion_cgi_msglist_v6" + ZONE_LIST_URL = "https://user.qzone.qq.com/proxy/domain/ic2.qzone.qq.com/cgi-bin/feeds/feeds3_html_more" + + def __init__(self, cookies_dict: Optional[Dict[str, str]] = None): + """初始化QZone API""" + self.cookies = cookies_dict or {} + self.gtk2 = '' + self.uin = 0 + self.qzonetoken = '' + + # 生成gtk2 + p_skey = self.cookies.get('p_skey') or self.cookies.get('p_skey'.upper()) + if p_skey: + self.gtk2 = self._generate_gtk(p_skey) + + # 提取UIN + uin_raw = self.cookies.get('uin') or self.cookies.get('o_uin') or self.cookies.get('p_uin') + if isinstance(uin_raw, str) and uin_raw: + uin_str = uin_raw.lstrip('o') + try: + self.uin = int(uin_str) + except Exception: + logger.error(f"UIN格式错误: {uin_raw}") + + def _generate_gtk(self, skey: str) -> str: + """生成GTK令牌""" + hash_val = 5381 + for i in range(len(skey)): + hash_val += (hash_val << 5) + ord(skey[i]) + return str(hash_val & 2147483647) + + async def _do_request( + self, + method: str, + url: str, + params: Dict = None, + data: Dict = None, + headers: Dict = None, + timeout: int = 10 + ) -> requests.Response: + """执行HTTP请求""" + try: + return requests.request( + method=method, + url=url, + params=params or {}, + data=data or {}, + headers=headers or {}, + cookies=self.cookies, + timeout=timeout + ) + except Exception as e: + logger.error(f"HTTP请求失败: {str(e)}") + raise + + async def validate_token(self, retry: int = 3) -> bool: + """验证Token有效性""" + # 简单验证 - 检查必要的Cookie是否存在 + required_cookies = ['p_skey', 'uin'] + for cookie in required_cookies: + if cookie not in self.cookies and cookie.upper() not in self.cookies: + logger.error(f"缺少必要的Cookie: {cookie}") + return False + return True + + def _image_to_base64(self, image: bytes) -> str: + """将图片转换为Base64""" + pic_base64 = base64.b64encode(image) + return str(pic_base64)[2:-1] + + async def _get_image_base64_by_url(self, url: str) -> str: + """通过URL获取图片的Base64编码""" + try: + res = await self._do_request("GET", url, timeout=60) + image_data = res.content + base64_str = base64.b64encode(image_data).decode('utf-8') + return base64_str + except Exception as e: + logger.error(f"获取图片Base64失败: {str(e)}") + raise + + async def upload_image(self, image: bytes) -> Dict[str, Any]: + """上传图片到QQ空间""" + try: + res = await self._do_request( + method="POST", + url=self.UPLOAD_IMAGE_URL, + data={ + "filename": "filename", + "zzpanelkey": "", + "uploadtype": "1", + "albumtype": "7", + "exttype": "0", + "skey": self.cookies["skey"], + "zzpaneluin": self.uin, + "p_uin": self.uin, + "uin": self.uin, + "p_skey": self.cookies['p_skey'], + "output_type": "json", + "qzonetoken": "", + "refer": "shuoshuo", + "charset": "utf-8", + "output_charset": "utf-8", + "upload_hd": "1", + "hd_width": "2048", + "hd_height": "10000", + "hd_quality": "96", + "backUrls": "http://upbak.photo.qzone.qq.com/cgi-bin/upload/cgi_upload_image,http://119.147.64.75/cgi-bin/upload/cgi_upload_image", + "url": "https://up.qzone.qq.com/cgi-bin/upload/cgi_upload_image?g_tk=" + self.gtk2, + "base64": "1", + "picfile": self._image_to_base64(image), + }, + headers={ + 'referer': 'https://user.qzone.qq.com/' + str(self.uin), + 'origin': 'https://user.qzone.qq.com' + }, + timeout=60 + ) + + if res.status_code == 200: + # 解析返回的JSON数据 + response_text = res.text + json_start = response_text.find('{') + json_end = response_text.rfind('}') + 1 + json_str = response_text[json_start:json_end] + return eval(json_str) # 使用eval解析,因为可能不是标准JSON + else: + raise Exception(f"上传图片失败,状态码: {res.status_code}") + + except Exception as e: + logger.error(f"上传图片异常: {str(e)}") + raise + + def _get_picbo_and_richval(self, upload_result: Dict[str, Any]) -> tuple[str, str]: + """从上传结果中提取picbo和richval""" + try: + if upload_result.get('ret') != 0: + raise Exception("上传图片失败") + + picbo_spt = upload_result['data']['url'].split('&bo=') + if len(picbo_spt) < 2: + raise Exception("解析图片URL失败") + picbo = picbo_spt[1] + + data = upload_result['data'] + richval = f",{data['albumid']},{data['lloc']},{data['sloc']},{data['type']},{data['height']},{data['width']},,{data['height']},{data['width']}" + + return picbo, richval + + except Exception as e: + logger.error(f"提取图片信息失败: {str(e)}") + raise + + async def publish_emotion(self, content: str, images: List[bytes] = None) -> str: + """发布说说""" + if images is None: + images = [] + + try: + post_data = { + "syn_tweet_verson": "1", + "paramstr": "1", + "who": "1", + "con": content, + "feedversion": "1", + "ver": "1", + "ugc_right": "1", + "to_sign": "0", + "hostuin": self.uin, + "code_version": "1", + "format": "json", + "qzreferrer": "https://user.qzone.qq.com/" + str(self.uin) + } + + # 处理图片 + if len(images) > 0: + pic_bos = [] + richvals = [] + + for img in images: + upload_result = await self.upload_image(img) + picbo, richval = self._get_picbo_and_richval(upload_result) + pic_bos.append(picbo) + richvals.append(richval) + + post_data['pic_bo'] = ','.join(pic_bos) + post_data['richtype'] = '1' + post_data['richval'] = '\t'.join(richvals) + + res = await self._do_request( + method="POST", + url=self.EMOTION_PUBLISH_URL, + params={'g_tk': self.gtk2, 'uin': self.uin}, + data=post_data, + headers={ + 'referer': 'https://user.qzone.qq.com/' + str(self.uin), + 'origin': 'https://user.qzone.qq.com' + } + ) + + if res.status_code == 200: + result = res.json() + return result.get('tid', '') + else: + raise Exception(f"发表说说失败,状态码: {res.status_code}") + + except Exception as e: + logger.error(f"发表说说异常: {str(e)}") + raise + + async def like_feed(self, fid: str, target_qq: str) -> bool: + """点赞说说""" + try: + post_data = { + 'qzreferrer': f'https://user.qzone.qq.com/{self.uin}', + 'opuin': self.uin, + 'unikey': f'http://user.qzone.qq.com/{target_qq}/mood/{fid}', + 'curkey': f'http://user.qzone.qq.com/{target_qq}/mood/{fid}', + 'appid': 311, + 'from': 1, + 'typeid': 0, + 'abstime': int(time.time()), + 'fid': fid, + 'active': 0, + 'format': 'json', + 'fupdate': 1, + } + + res = await self._do_request( + method="POST", + url=self.DOLIKE_URL, + params={'g_tk': self.gtk2}, + data=post_data, + headers={ + 'referer': 'https://user.qzone.qq.com/' + str(self.uin), + 'origin': 'https://user.qzone.qq.com' + } + ) + + return res.status_code == 200 + + except Exception as e: + logger.error(f"点赞说说异常: {str(e)}") + return False + + async def comment_feed(self, fid: str, target_qq: str, content: str) -> bool: + """评论说说""" + try: + post_data = { + "topicId": f'{target_qq}_{fid}__1', + "uin": self.uin, + "hostUin": target_qq, + "feedsType": 100, + "inCharset": "utf-8", + "outCharset": "utf-8", + "plat": "qzone", + "source": "ic", + "platformid": 52, + "format": "fs", + "ref": "feeds", + "content": content, + } + + res = await self._do_request( + method="POST", + url=self.COMMENT_URL, + params={"g_tk": self.gtk2}, + data=post_data, + headers={ + 'referer': 'https://user.qzone.qq.com/' + str(self.uin), + 'origin': 'https://user.qzone.qq.com' + } + ) + + return res.status_code == 200 + + except Exception as e: + logger.error(f"评论说说异常: {str(e)}") + return False + + async def get_feed_list(self, target_qq: str, num: int) -> List[Dict[str, Any]]: + """获取指定用户的说说列表""" + try: + logger.info(f'获取用户 {target_qq} 的说说列表') + + res = await self._do_request( + method="GET", + url=self.LIST_URL, + params={ + 'g_tk': self.gtk2, + "uin": target_qq, + "ftype": 0, + "sort": 0, + "pos": 0, + "num": num, + "replynum": 100, + "callback": "_preloadCallback", + "code_version": 1, + "format": "jsonp", + "need_comment": 1, + "need_private_comment": 1 + }, + headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + "Referer": f"https://user.qzone.qq.com/{target_qq}", + "Host": "user.qzone.qq.com", + "Connection": "keep-alive" + } + ) + + if res.status_code != 200: + raise Exception(f"访问失败,状态码: {res.status_code}") + + # 解析JSONP响应 + data = res.text + if data.startswith('_preloadCallback(') and data.endswith(');'): + json_str = data[len('_preloadCallback('):-2] + else: + json_str = data + + json_data = json.loads(json_str) + + if json_data.get('code') != 0: + return [{"error": json_data.get('message', '未知错误')}] + + # 解析说说列表 + return await self._parse_feed_list(json_data, target_qq) + + except Exception as e: + logger.error(f"获取说说列表失败: {str(e)}") + return [{"error": f'获取说说列表失败: {str(e)}'}] + + async def _parse_feed_list(self, json_data: Dict[str, Any], target_qq: str) -> List[Dict[str, Any]]: + """解析说说列表数据""" + try: + feeds_list = [] + login_info = json_data.get('logininfo', {}) + uin_nickname = login_info.get('name', '') + + for msg in json_data.get("msglist", []): + # 检查是否已经评论过 + is_commented = False + commentlist = msg.get("commentlist", []) + + if isinstance(commentlist, list): + for comment in commentlist: + if comment.get("name") == uin_nickname: + logger.info('已评论过此说说,跳过') + is_commented = True + break + + if not is_commented: + # 解析说说信息 + feed_info = await self._parse_single_feed(msg) + if feed_info: + feeds_list.append(feed_info) + + if len(feeds_list) == 0: + return [{"error": '你已经看过所有说说了,没有必要再看一遍'}] + + return feeds_list + + except Exception as e: + logger.error(f"解析说说列表失败: {str(e)}") + return [{"error": f'解析说说列表失败: {str(e)}'}] + + async def _parse_single_feed(self, msg: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """解析单条说说信息""" + try: + # 基本信息 + timestamp = msg.get("created_time", "") + created_time = "unknown" + if timestamp: + time_tuple = time.localtime(timestamp) + created_time = time.strftime('%Y-%m-%d %H:%M:%S', time_tuple) + + tid = msg.get("tid", "") + content = msg.get("content", "") + + logger.debug(f"正在解析说说: {content[:20]}...") + + # 解析图片 + images = [] + if 'pic' in msg: + for pic in msg['pic']: + url = pic.get('url1') or pic.get('pic_id') or pic.get('smallurl') + if url: + try: + image_base64 = await self._get_image_base64_by_url(url) + image_manager = get_image_manager() + image_description = await image_manager.get_image_description(image_base64) + images.append(image_description) + except Exception as e: + logger.warning(f"处理图片失败: {str(e)}") + + # 解析视频 + videos = [] + if 'video' in msg: + for video in msg['video']: + # 视频缩略图 + video_image_url = video.get('url1') or video.get('pic_url') + if video_image_url: + try: + image_base64 = await self._get_image_base64_by_url(video_image_url) + image_manager = get_image_manager() + image_description = await image_manager.get_image_description(image_base64) + images.append(f"视频缩略图: {image_description}") + except Exception as e: + logger.warning(f"处理视频缩略图失败: {str(e)}") + + # 视频URL + url = video.get('url3') + if url: + videos.append(url) + + # 解析转发内容 + rt_con = "" + if "rt_con" in msg: + rt_con_data = msg.get("rt_con") + if isinstance(rt_con_data, dict): + rt_con = rt_con_data.get("content", "") + + return { + "tid": tid, + "created_time": created_time, + "content": content, + "images": images, + "videos": videos, + "rt_con": rt_con + } + + except Exception as e: + logger.error(f"解析单条说说失败: {str(e)}") + return None + + async def get_monitor_feed_list(self, num: int) -> List[Dict[str, Any]]: + """获取监控用的说说列表(所有好友的最新动态)""" + try: + res = await self._do_request( + method="GET", + url=self.ZONE_LIST_URL, + params={ + "uin": self.uin, + "scope": 0, + "view": 1, + "filter": "all", + "flag": 1, + "applist": "all", + "pagenum": 1, + "count": num, + "aisortEndTime": 0, + "aisortOffset": 0, + "aisortBeginTime": 0, + "begintime": 0, + "format": "json", + "g_tk": self.gtk2, + "useutf8": 1, + "outputhtmlfeed": 1 + }, + headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + "Referer": f"https://user.qzone.qq.com/{self.uin}", + "Host": "user.qzone.qq.com", + "Connection": "keep-alive" + } + ) + + if res.status_code != 200: + raise Exception(f"访问失败,状态码: {res.status_code}") + + # 解析响应数据 + data = res.text + if data.startswith('_Callback(') and data.endswith(');'): + data = data[len('_Callback('):-2] + + data = data.replace('undefined', 'null') + + try: + json_data = json5.loads(data) + feeds_data = json_data['data']['data'] + except Exception as e: + logger.error(f"解析JSON数据失败: {str(e)}") + return [] + + # 解析说说列表 + return await self._parse_monitor_feeds(feeds_data) + + except Exception as e: + logger.error(f"获取监控说说列表失败: {str(e)}") + return [] + + async def _parse_monitor_feeds(self, feeds_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """解析监控说说数据""" + try: + feeds_list = [] + current_uin = str(self.uin) + + for feed in feeds_data: + if not feed: + continue + + # 过滤广告和非说说内容 + appid = str(feed.get('appid', '')) + if appid != '311': + continue + + target_qq = feed.get('uin', '') + tid = feed.get('key', '') + + if not target_qq or not tid: + continue + + # 过滤自己的说说 + if target_qq == current_uin: + continue + + # 解析HTML内容 + html_content = feed.get('html', '') + if not html_content: + continue + + feed_info = await self._parse_monitor_html(html_content, target_qq, tid) + if feed_info: + feeds_list.append(feed_info) + + logger.info(f"成功解析 {len(feeds_list)} 条未读说说") + return feeds_list + + except Exception as e: + logger.error(f"解析监控说说数据失败: {str(e)}") + return [] + + async def _parse_monitor_html(self, html_content: str, target_qq: str, tid: str) -> Optional[Dict[str, Any]]: + """解析监控说说的HTML内容""" + try: + soup = bs4.BeautifulSoup(html_content, 'html.parser') + + # 检查是否已经点赞(判断是否已读) + like_btn = soup.find('a', class_='qz_like_btn_v3') + if not like_btn: + like_btn = soup.find('a', attrs={'data-islike': True}) + + if like_btn: + data_islike = like_btn.get('data-islike') + if data_islike == '1': # 已点赞,跳过 + return None + + # 提取文字内容 + text_div = soup.find('div', class_='f-info') + text = text_div.get_text(strip=True) if text_div else "" + + # 提取转发内容 + rt_con = "" + txt_box = soup.select_one('div.txt-box') + if txt_box: + rt_con = txt_box.get_text(strip=True) + if ':' in rt_con: + rt_con = rt_con.split(':', 1)[1].strip() + + # 提取图片 + images = [] + img_box = soup.find('div', class_='img-box') + if img_box: + for img in img_box.find_all('img'): + src = img.get('src') + if src and not src.startswith('http://qzonestyle.gtimg.cn'): + try: + image_base64 = await self._get_image_base64_by_url(src) + image_manager = get_image_manager() + description = await image_manager.get_image_description(image_base64) + images.append(description) + except Exception as e: + logger.warning(f"处理图片失败: {str(e)}") + + # 视频缩略图 + img_tag = soup.select_one('div.video-img img') + if img_tag and 'src' in img_tag.attrs: + try: + image_base64 = await self._get_image_base64_by_url(img_tag['src']) + image_manager = get_image_manager() + description = await image_manager.get_image_description(image_base64) + images.append(f"视频缩略图: {description}") + except Exception as e: + logger.warning(f"处理视频缩略图失败: {str(e)}") + + # 视频URL + videos = [] + video_div = soup.select_one('div.img-box.f-video-wrap.play') + if video_div and 'url3' in video_div.attrs: + videos.append(video_div['url3']) + + return { + 'target_qq': target_qq, + 'tid': tid, + 'content': text, + 'images': images, + 'videos': videos, + 'rt_con': rt_con, + } + + except Exception as e: + logger.error(f"解析监控HTML失败: {str(e)}") + return None + + +class QZoneManager: + """QQ空间管理器 - 高级封装类""" + + def __init__(self, port: str, host: str): + """初始化QZone管理器""" + self.port = port + self.host = host + self.cookie_manager = CookieManager() + + async def _get_qzone_api(self, qq_account: str) -> Optional[QZoneAPI]: + """获取QZone API实例""" + try: + # 更新Cookie + await self.cookie_manager.renew_cookies(self.port, self.host) + + # 加载Cookie + cookies = self.cookie_manager.load_cookies(qq_account) + if not cookies: + logger.error("无法加载Cookie") + return None + + # 创建API实例 + qzone_api = QZoneAPI(cookies) + + # 验证Token + if not await qzone_api.validate_token(): + logger.error("Token验证失败") + return None + + return qzone_api + + except Exception as e: + logger.error(f"获取QZone API失败: {str(e)}") + return None + + async def send_feed(self, message: str, image_directory: str, qq_account: str, enable_image: bool) -> bool: + """发送说说""" + try: + # 获取API实例 + qzone_api = await self._get_qzone_api(qq_account) + if not qzone_api: + return False + + # 处理图片 + images = [] + if enable_image: + images = await self._load_images(image_directory, message) + + # 发送说说 + tid = await qzone_api.publish_emotion(message, images) + if tid: + logger.info(f"成功发送说说,TID: {tid}") + return True + else: + logger.error("发送说说失败") + return False + + except Exception as e: + logger.error(f"发送说说异常: {str(e)}") + return False + + async def _load_images(self, image_directory: str, message: str) -> List[bytes]: + """加载图片文件""" + images = [] + + try: + if os.path.exists(image_directory): + # 获取所有未处理的图片文件 + all_files = [f for f in os.listdir(image_directory) + if os.path.isfile(os.path.join(image_directory, f))] + unprocessed_files = [f for f in all_files if not f.startswith("done_")] + unprocessed_files_sorted = sorted(unprocessed_files) + + for image_file in unprocessed_files_sorted: + full_path = os.path.join(image_directory, image_file) + try: + with open(full_path, "rb") as img: + images.append(img.read()) + + # 重命名已处理的文件 + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + new_filename = f"done_{timestamp}_{image_file}" + new_path = os.path.join(image_directory, new_filename) + os.rename(full_path, new_path) + + except Exception as e: + logger.warning(f"处理图片文件 {image_file} 失败: {str(e)}") + + # 如果没有图片文件,尝试获取表情包 + if not images: + image = await emoji_api.get_by_description(message) + if image: + image_base64, description, scene = image + image_data = base64.b64decode(image_base64) + images.append(image_data) + + except Exception as e: + logger.error(f"加载图片失败: {str(e)}") + + return images + + async def read_feed(self, qq_account: str, target_qq: str, num: int) -> List[Dict[str, Any]]: + """读取指定用户的说说""" + try: + # 获取API实例 + qzone_api = await self._get_qzone_api(qq_account) + if not qzone_api: + return [{"error": "无法获取QZone API"}] + + # 获取说说列表 + feeds_list = await qzone_api.get_feed_list(target_qq, num) + return feeds_list + + except Exception as e: + logger.error(f"读取说说失败: {str(e)}") + return [{"error": f"读取说说失败: {str(e)}"}] + + async def monitor_read_feed(self, qq_account: str, num: int) -> List[Dict[str, Any]]: + """监控读取所有好友的说说""" + try: + # 获取API实例 + qzone_api = await self._get_qzone_api(qq_account) + if not qzone_api: + return [] + + # 获取监控说说列表 + feeds_list = await qzone_api.get_monitor_feed_list(num) + return feeds_list + + except Exception as e: + logger.error(f"监控读取说说失败: {str(e)}") + return [] + + async def like_feed(self, qq_account: str, target_qq: str, fid: str) -> bool: + """点赞说说""" + try: + # 获取API实例 + qzone_api = await self._get_qzone_api(qq_account) + if not qzone_api: + return False + + # 点赞说说 + success = await qzone_api.like_feed(fid, target_qq) + return success + + except Exception as e: + logger.error(f"点赞说说失败: {str(e)}") + return False + + async def comment_feed(self, qq_account: str, target_qq: str, fid: str, content: str) -> bool: + """评论说说""" + try: + # 获取API实例 + qzone_api = await self._get_qzone_api(qq_account) + if not qzone_api: + return False + + # 评论说说 + success = await qzone_api.comment_feed(fid, target_qq, content) + return success + + except Exception as e: + logger.error(f"评论说说失败: {str(e)}") + return False + + +# ===== 辅助功能函数 ===== + +async def generate_image_by_sf(api_key: str, story: str, image_dir: str, batch_size: int = 1) -> bool: + """使用硅基流动API生成图片""" + try: + logger.info(f"正在生成图片,保存路径: {image_dir}") + + # 获取模型配置 + models = llm_api.get_available_models() + prompt_model = "replyer_1" + model_config = models.get(prompt_model) + + if not model_config: + logger.error('配置模型失败') + return False + + # 生成图片提示词 + bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人") + bot_details = config_api.get_global_config("identity.identity_detail", "未知") + + success, prompt, reasoning, model_name = await llm_api.generate_with_model( + prompt=f""" + 请根据以下QQ空间说说内容配图,并构建生成配图的风格和prompt。 + 说说主人信息:'{bot_personality},{str(bot_details)}'。 + 说说内容:'{story}'。 + 请注意:仅回复用于生成图片的prompt,不要有其他的任何正文以外的冗余输出""", + model_config=model_config, + request_type="story.generate", + temperature=0.3, + max_tokens=1000 + ) + + if not success: + logger.error('生成说说配图prompt失败') + return False + + logger.info(f'即将生成说说配图:{prompt}') + + # 调用硅基流动API + sf_url = "https://api.siliconflow.cn/v1/images/generations" + sf_headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json" + } + sf_data = { + "model": "Kwai-Kolors/Kolors", + "prompt": prompt, + "negative_prompt": "lowres, bad anatomy, bad hands, text, error, cropped, worst quality, low quality, normal quality, jpeg artifacts, signature, watermark, username, blurry", + "image_size": "1024x1024", + "batch_size": batch_size, + "seed": random.randint(1, 9999999999), + "num_inference_steps": 20, + "guidance_scale": 7.5, + } + + res = requests.post(sf_url, headers=sf_headers, json=sf_data) + + if res.status_code != 200: + logger.error(f'生成图片出错,错误码: {res.status_code}') + return False + + json_data = res.json() + image_urls = [img["url"] for img in json_data["images"]] + + # 确保目录存在 + Path(image_dir).mkdir(parents=True, exist_ok=True) + + # 下载并保存图片 + for i, img_url in enumerate(image_urls): + try: + img_response = requests.get(img_url) + filename = f"sf_{i}_{int(time.time())}.png" + save_path = Path(image_dir) / filename + + with open(save_path, "wb") as f: + f.write(img_response.content) + + logger.info(f"图片已保存至: {save_path}") + + except Exception as e: + logger.error(f"下载图片失败: {str(e)}") + return False + + return True + + except Exception as e: + logger.error(f"生成图片失败: {str(e)}") + return False + + +async def get_send_history(qq_account: str) -> str: + """获取发送历史记录""" + try: + cookie_manager = CookieManager() + cookies = cookie_manager.load_cookies(qq_account) + + if not cookies: + return "" + + qzone_api = QZoneAPI(cookies) + + if not await qzone_api.validate_token(): + logger.error("Token验证失败") + return "" + + feeds_list = await qzone_api.get_feed_list(target_qq=qq_account, num=5) + + if not isinstance(feeds_list, list) or len(feeds_list) == 0: + return "" + + history_lines = ["==================="] + + for feed in feeds_list: + if not isinstance(feed, dict): + continue + + created_time = feed.get("created_time", "") + content = feed.get("content", "") + images = feed.get("images", []) + rt_con = feed.get("rt_con", "") + + if not rt_con: + history_lines.append( + f"\n时间:'{created_time}'\n说说内容:'{content}'\n图片:'{images}'\n===================" + ) + else: + history_lines.append( + f"\n时间: '{created_time}'\n转发了一条说说,内容为: '{rt_con}'\n图片: '{images}'\n对该说说的评论为: '{content}'\n===================" + ) + + return "".join(history_lines) + + except Exception as e: + logger.error(f"获取发送历史失败: {str(e)}") + return "" \ No newline at end of file diff --git a/src/plugins/built_in/Maizone/scheduler.py b/src/plugins/built_in/Maizone/scheduler.py new file mode 100644 index 000000000..cf1b5dd07 --- /dev/null +++ b/src/plugins/built_in/Maizone/scheduler.py @@ -0,0 +1,362 @@ +import asyncio +import datetime +import time +import traceback +import os +import toml +from typing import Dict, List, Any + +from src.common.logger import get_logger +from src.plugin_system.apis import llm_api, config_api + +# 导入工具模块 +import sys +import os +sys.path.append(os.path.dirname(__file__)) + +from qzone_utils import QZoneManager, get_send_history + +# 获取日志记录器 +logger = get_logger('MaiZone-Scheduler') + + +class ScheduleManager: + """定时任务管理器 - 负责定时发送说说""" + + def __init__(self, plugin): + """初始化定时任务管理器""" + self.plugin = plugin + self.is_running = False + self.task = None + self.last_send_times: Dict[str, float] = {} # 记录每个时间点的最后发送时间 + self.config_file_path = os.path.join(os.path.dirname(__file__), "config.toml") + + logger.info("定时任务管理器初始化完成") + + # 初始化时测试配置读取 + + def _read_schedule_config(self) -> List[Dict[str, str]]: + """直接从TOML配置文件读取日程配置""" + try: + if not os.path.exists(self.config_file_path): + logger.error(f"配置文件不存在: {self.config_file_path}") + return [] + + # 读取TOML文件 + with open(self.config_file_path, 'r', encoding='utf-8') as f: + config_data = toml.load(f) + + # 获取schedule配置 + schedule_config = config_data.get('schedule', {}) + schedules = schedule_config.get('schedules', []) + + logger.info(f"从配置文件读取到 {len(schedules)} 个定时任务") + + # 验证每个日程的格式 + valid_schedules = [] + for i, schedule in enumerate(schedules): + if isinstance(schedule, dict) and 'time' in schedule and 'topic' in schedule: + valid_schedules.append({ + 'time': str(schedule['time']), + 'topic': str(schedule['topic']) + }) + logger.debug(f"日程 {i+1}: {schedule['time']} - {schedule['topic']}") + else: + logger.warning(f"跳过无效的日程配置: {schedule}") + + return valid_schedules + + except Exception as e: + logger.error(f"读取日程配置失败: {str(e)}") + return [] + + def _is_schedule_enabled(self) -> bool: + """检查定时任务是否启用""" + try: + if not os.path.exists(self.config_file_path): + return False + + with open(self.config_file_path, 'r', encoding='utf-8') as f: + config_data = toml.load(f) + + schedule_config = config_data.get('schedule', {}) + enabled = schedule_config.get('enable_schedule', False) + + logger.debug(f"定时任务启用状态: {enabled}") + return bool(enabled) + + except Exception as e: + logger.error(f"检查定时任务启用状态失败: {str(e)}") + return False + + async def start(self): + """启动定时任务""" + if self.is_running: + logger.warning("定时任务已在运行中") + return + + self.is_running = True + self.task = asyncio.create_task(self._schedule_loop()) + logger.info("定时发送说说任务已启动") + + async def stop(self): + """停止定时任务""" + if not self.is_running: + return + + self.is_running = False + + if self.task: + self.task.cancel() + try: + await self.task + except asyncio.CancelledError: + logger.info("定时任务已被取消") + + logger.info("定时发送说说任务已停止") + + async def _schedule_loop(self): + """定时任务主循环""" + while self.is_running: + try: + # 检查定时任务是否启用 + if not self._is_schedule_enabled(): + logger.info("定时任务已禁用,等待下次检查") + await asyncio.sleep(60) + continue + + # 获取当前时间 + current_time = datetime.datetime.now().strftime("%H:%M") + + # 直接从配置文件读取定时任务配置 + schedules = self._read_schedule_config() + + if not schedules: + logger.info("未找到有效的定时任务配置") + await asyncio.sleep(60) + continue + + # 检查每个定时任务 + for schedule in schedules: + await self._check_and_execute_schedule(schedule, current_time) + + # 每分钟检查一次 + await asyncio.sleep(60) + + except asyncio.CancelledError: + logger.info("定时任务循环被取消") + break + except Exception as e: + logger.error(f"定时任务循环出错: {str(e)}") + logger.error(traceback.format_exc()) + # 出错后等待5分钟再重试 + await asyncio.sleep(300) + + async def _check_and_execute_schedule(self, schedule: Dict[str, Any], current_time: str): + """检查并执行定时任务""" + try: + schedule_time = schedule.get("time", "") + topic = schedule.get("topic", "") + + # 检查是否到达发送时间 + if current_time == schedule_time: + # 避免同一分钟内重复发送 + last_send_time = self.last_send_times.get(schedule_time, 0) + current_timestamp = time.time() + + if current_timestamp - last_send_time > 60: # 超过1分钟才允许发送 + logger.info(f"定时任务触发: {schedule_time} - 主题: {topic}") + self.last_send_times[schedule_time] = current_timestamp + + # 执行发送任务 + success = await self._execute_scheduled_send(topic) + + if success: + logger.info(f"定时说说发送成功: {topic}") + else: + logger.error(f"定时说说发送失败: {topic}") + else: + logger.debug(f"跳过重复发送: {schedule_time}") + + except Exception as e: + logger.error(f"检查定时任务失败: {str(e)}") + + async def _execute_scheduled_send(self, topic: str) -> bool: + """执行定时发送任务""" + try: + # 生成说说内容 + story = await self._generate_story_content(topic) + if not story: + logger.error("生成定时说说内容失败") + return False + + logger.info(f"定时任务生成说说内容: '{story}'") + + # 处理配图 + await self._handle_images(story) + + # 发送说说 + success = await self._send_scheduled_feed(story) + + return success + + except Exception as e: + logger.error(f"执行定时发送任务失败: {str(e)}") + return False + + async def _generate_story_content(self, topic: str) -> str: + """生成定时说说内容""" + try: + # 获取模型配置 + models = llm_api.get_available_models() + text_model = str(self.plugin.get_config("models.text_model", "replyer_1")) + model_config = models.get(text_model) + + if not model_config: + logger.error("未配置LLM模型") + return "" + + # 获取机器人信息 + bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人") + bot_expression = config_api.get_global_config("expression.expression_style", "内容积极向上") + qq_account = config_api.get_global_config("bot.qq_account", "") + + # 构建提示词 + if topic: + prompt = f""" + 你是'{bot_personality}',你想写一条主题是'{topic}'的说说发表在qq空间上, + {bot_expression} + 不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字, + 只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出 + """ + else: + prompt = f""" + 你是'{bot_personality}',你想写一条说说发表在qq空间上,主题不限 + {bot_expression} + 不要刻意突出自身学科背景,不要浮夸,不要夸张修辞,可以适当使用颜文字, + 只输出一条说说正文的内容,不要有其他的任何正文以外的冗余输出 + """ + + # 添加历史记录避免重复 + prompt += "\n以下是你最近发过的说说,写新说说时注意不要在相隔不长的时间发送相似内容的说说\n" + history_block = await get_send_history(qq_account) + if history_block: + prompt += history_block + + # 生成内容 + success, story, reasoning, model_name = await llm_api.generate_with_model( + prompt=prompt, + model_config=model_config, + request_type="story.generate", + temperature=0.3, + max_tokens=1000 + ) + + if success: + return story + else: + logger.error("生成定时说说内容失败") + return "" + + except Exception as e: + logger.error(f"生成定时说说内容异常: {str(e)}") + return "" + + async def _handle_images(self, story: str): + """处理定时说说配图""" + try: + enable_ai_image = bool(self.plugin.get_config("send.enable_ai_image", False)) + apikey = str(self.plugin.get_config("models.siliconflow_apikey", "")) + image_dir = str(self.plugin.get_config("send.image_directory", "./plugins/Maizone/images")) + image_num = int(self.plugin.get_config("send.ai_image_number", 1) or 1) + + if enable_ai_image and apikey: + from qzone_utils import generate_image_by_sf + await generate_image_by_sf( + api_key=apikey, + story=story, + image_dir=image_dir, + batch_size=image_num + ) + logger.info("定时任务AI配图生成完成") + elif enable_ai_image and not apikey: + logger.warning('启用了AI配图但未填写API密钥') + + except Exception as e: + logger.error(f"处理定时说说配图失败: {str(e)}") + + async def _send_scheduled_feed(self, story: str) -> bool: + """发送定时说说""" + try: + # 获取配置 + port = str(self.plugin.get_config("plugin.http_port", "3000")) + host = self.plugin.get_config("plugin.http_host", "127.0.0.1") + qq_account = config_api.get_global_config("bot.qq_account", "") + enable_image = self.plugin.get_config("send.enable_image", False) + image_dir = str(self.plugin.get_config("send.image_directory", "./plugins/Maizone/images")) + + # 创建QZone管理器并发送 + qzone_manager = QZoneManager(port, host) + success = await qzone_manager.send_feed(story, image_dir, qq_account, enable_image) + + if success: + logger.info(f"定时说说发送成功: {story}") + else: + logger.error("定时说说发送失败") + + return success + + except Exception as e: + logger.error(f"发送定时说说失败: {str(e)}") + return False + + def get_status(self) -> Dict[str, Any]: + """获取定时任务状态""" + return { + "is_running": self.is_running, + "enabled": self._is_schedule_enabled(), + "schedules": self._read_schedule_config(), + "last_send_times": self.last_send_times + } + + def add_schedule(self, time_str: str, topic: str) -> bool: + """添加定时任务""" + try: + schedules = self.plugin.get_config("schedule.schedules", []) + new_schedule = {"time": time_str, "topic": topic} + + # 检查是否已存在相同时间的任务 + for schedule in schedules: + if isinstance(schedule, dict) and schedule.get("time") == time_str: + logger.warning(f"时间 {time_str} 已存在定时任务") + return False + + schedules.append(new_schedule) + # 注意:这里需要插件系统支持动态更新配置 + logger.info(f"添加定时任务: {time_str} - {topic}") + return True + + except Exception as e: + logger.error(f"添加定时任务失败: {str(e)}") + return False + + def remove_schedule(self, time_str: str) -> bool: + """移除定时任务""" + try: + schedules = self.plugin.get_config("schedule.schedules", []) + original_count = len(schedules) + + # 过滤掉指定时间的任务 + schedules = [s for s in schedules if not (isinstance(s, dict) and s.get("time") == time_str)] + + if len(schedules) < original_count: + # 注意:这里需要插件系统支持动态更新配置 + logger.info(f"移除定时任务: {time_str}") + return True + else: + logger.warning(f"未找到时间为 {time_str} 的定时任务") + return False + + except Exception as e: + logger.error(f"移除定时任务失败: {str(e)}") + return False \ No newline at end of file diff --git a/src/plugins/built_in/WEB_SEARCH_TOOL/_manifest.json b/src/plugins/built_in/WEB_SEARCH_TOOL/_manifest.json new file mode 100644 index 000000000..fce81b291 --- /dev/null +++ b/src/plugins/built_in/WEB_SEARCH_TOOL/_manifest.json @@ -0,0 +1,27 @@ +{ + "manifest_version": 1, + "name": "web_search_tool", + "version": "1.0.0", + "description": "一个用于在互联网上搜索信息的工具", + "author": { + "name": "MaiBot-Plus开发团队", + "url": "https://github.com/MaiBot-Plus" + }, + "license": "GPL-v3.0-or-later", + + "host_application": { + "min_version": "0.10.0" + }, + "homepage_url": "https://github.com/MaiBot-Plus/mmc", + "repository_url": "https://github.com/MaiBot-Plus/mmc", + "keywords": ["web_search", "url_parser"], + "categories": ["web_search", "url_parser"], + + "default_locale": "zh-CN", + "locales_path": "_locales", + + "plugin_info": { + "is_built_in": false, + "plugin_type": "web_search" + } +} \ No newline at end of file diff --git a/src/plugins/built_in/WEB_SEARCH_TOOL/plugin.py b/src/plugins/built_in/WEB_SEARCH_TOOL/plugin.py new file mode 100644 index 000000000..04afec97b --- /dev/null +++ b/src/plugins/built_in/WEB_SEARCH_TOOL/plugin.py @@ -0,0 +1,372 @@ +import os +import asyncio +import functools +from typing import Any, Dict, List +from datetime import datetime, timedelta +from exa_py import Exa +import asyncio +from asyncddgs import aDDGS + +from src.common.logger import get_logger +from typing import Tuple,Type +from src.plugin_system import ( + BasePlugin, + register_plugin, + BaseAction, + BaseCommand, + BaseTool, + ComponentInfo, + ActionActivationType, + ConfigField, + BaseEventHandler, + llm_api, + EventType, + MaiMessages, + ToolParamType +) +from src.config.config import global_config +import httpx +from bs4 import BeautifulSoup + +logger = get_logger("web_surfing_tool") + + +class WebSurfingTool(BaseTool): + name: str = "web_search" + description: str = "当需要回答关于最新事件、实时信息或用户不清楚的特定主题时,使用此工具在互联网上搜索信息。" + parameters = [ + ("query", ToolParamType.STRING, "要搜索的关键词或问题。", True, None), + ("num_results", ToolParamType.INTEGER, "期望每个搜索引擎返回的搜索结果数量,默认为5。", False, "5"), + ("time_range", ToolParamType.STRING, "指定搜索的时间范围,可以是 'any', 'week', 'month'。默认为 'any'。", False, "any") + ] # type: ignore + + def __init__(self): + EXA_API_KEY = self.get_config("exa.api_key", None) + super().__init__() + self.exa = Exa(api_key=EXA_API_KEY) if EXA_API_KEY else None + + if not self.exa: + logger.warning("Exa API Key 未配置,Exa 搜索功能将不可用。") + + async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]: + query = function_args.get("query") + if not query: + return {"error": "搜索查询不能为空。"} + + logger.info(f"开始并行搜索,参数: '{function_args}'") + + search_tasks = [] + if self.exa: + search_tasks.append(self._search_exa(function_args)) + search_tasks.append(self._search_ddg(function_args)) + + try: + search_results_lists = await asyncio.gather(*search_tasks, return_exceptions=True) + + all_results = [] + for result in search_results_lists: + if isinstance(result, list): + all_results.extend(result) + elif isinstance(result, Exception): + logger.error(f"搜索时发生错误: {result}") + + # 去重并格式化 + unique_results = self._deduplicate_results(all_results) + formatted_content = self._format_results(unique_results) + + result_package = { + "type": "web_search_result", + "content": formatted_content, + } + + return result_package + + except Exception as e: + logger.error(f"执行并行网络搜索时发生异常: {e}", exc_info=True) + return {"error": f"执行网络搜索时发生严重错误: {str(e)}"} + + def _deduplicate_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + unique_urls = set() + unique_results = [] + for res in results: + if isinstance(res, dict) and res.get("url") and res["url"] not in unique_urls: + unique_urls.add(res["url"]) + unique_results.append(res) + return unique_results + + async def _search_exa(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: + query = args["query"] + num_results = args.get("num_results", 3) + time_range = args.get("time_range", "any") + + exa_args = {"num_results": num_results, "text": True, "highlights": True} + if time_range != "any": + today = datetime.now() + start_date = today - timedelta(days=7 if time_range == "week" else 30) + exa_args["start_published_date"] = start_date.strftime('%Y-%m-%d') + + try: + if not self.exa: + return [] + loop = asyncio.get_running_loop() + func = functools.partial(self.exa.search_and_contents, query, **exa_args) + search_response = await loop.run_in_executor(None, func) + + return [ + { + "title": res.title, + "url": res.url, + "snippet": " ".join(getattr(res, 'highlights', [])) or (getattr(res, 'text', '')[:250] + '...'), + "provider": "Exa" + } + for res in search_response.results + ] + except Exception as e: + logger.error(f"Exa 搜索失败: {e}") + return [] + + async def _search_ddg(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: + query = args["query"] + num_results = args.get("num_results", 3) + + try: + async with aDDGS() as ddgs: + search_response = await ddgs.text(query, max_results=num_results) + + return [ + { + "title": r.get("title"), + "url": r.get("href"), + "snippet": r.get("body"), + "provider": "DuckDuckGo" + } + for r in search_response + ] + except Exception as e: + logger.error(f"DuckDuckGo 搜索失败: {e}") + return [] + + def _format_results(self, results: List[Dict[str, Any]]) -> str: + if not results: + return "没有找到相关的网络信息。" + + formatted_string = "根据网络搜索结果:\n\n" + for i, res in enumerate(results, 1): + title = res.get("title", '无标题') + url = res.get("url", '#') + snippet = res.get("snippet", '无摘要') + provider = res.get("provider", "未知来源") + + formatted_string += f"{i}. **{title}** (来自: {provider})\n" + formatted_string += f" - 摘要: {snippet}\n" + formatted_string += f" - 来源: {url}\n\n" + + return formatted_string + +class URLParserTool(BaseTool): + """ + 一个用于解析和总结一个或多个网页URL内容的工具。 + """ + name: str = "parse_url" + description: str = "当需要理解一个或多个特定网页链接的内容时,使用此工具。例如:'这些网页讲了什么?[https://example.com, https://example2.com]' 或 '帮我总结一下这些文章'" + parameters = [ + ("urls", ToolParamType.STRING, "要理解的网站", True, None), + ] + def __init__(self): + super().__init__() + EXA_API_KEY = self.get_config("exa.api_key", None) + if not EXA_API_KEY or EXA_API_KEY == "YOUR_API_KEY_HERE": + self.exa = None + logger.error("Exa API Key 未配置,URL解析功能将受限。") + else: + self.exa = Exa(api_key=EXA_API_KEY) + async def _local_parse_and_summarize(self, url: str) -> Dict[str, Any]: + """ + 使用本地库(httpx, BeautifulSoup)解析URL,并调用LLM进行总结。 + """ + try: + async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: + response = await client.get(url) + response.raise_for_status() + + soup = BeautifulSoup(response.text, "html.parser") + + title = soup.title.string if soup.title else "无标题" + for script in soup(["script", "style"]): + script.extract() + text = soup.get_text(separator="\n", strip=True) + + if not text: + return {"error": "无法从页面提取有效文本内容。"} + + summary_prompt = f"请根据以下网页内容,生成一段不超过300字的中文摘要,保留核心信息和关键点:\n\n---\n\n标题: {title}\n\n内容:\n{text[:4000]}\n\n---\n\n摘要:" + + + text_model = str(self.get_config("models.text_model", "replyer_1")) + models = llm_api.get_available_models() + model_config = models.get(text_model) + if not model_config: + logger.error("未配置LLM模型") + return {"error": "未配置LLM模型"} + + success, summary, reasoning, model_name = await llm_api.generate_with_model( + prompt=summary_prompt, + model_config=model_config, + request_type="story.generate", + temperature=0.3, + max_tokens=1000 + ) + + if not success: + logger.info(f"生成摘要失败: {summary}") + return {"error": "发生ai错误"} + + logger.info(f"成功生成摘要内容:'{summary}'") + + return { + "title": title, + "url": url, + "snippet": summary, + "source": "local" + } + + except httpx.HTTPStatusError as e: + logger.warning(f"本地解析URL '{url}' 失败 (HTTP {e.response.status_code})") + return {"error": f"请求失败,状态码: {e.response.status_code}"} + except Exception as e: + logger.error(f"本地解析或总结URL '{url}' 时发生未知异常: {e}", exc_info=True) + return {"error": f"发生未知错误: {str(e)}"} + + async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]: + """ + 执行URL内容提取和总结。优先使用Exa,失败后尝试本地解析。 + """ + urls = function_args.get("urls") + if not urls: + return {"error": "URL列表不能为空。"} + + + successful_results = [] + error_messages = [] + urls_to_retry_locally = [] + + # 步骤 1: 尝试使用 Exa API 进行解析 + contents_response = None + if self.exa: + logger.info(f"开始使用 Exa API 解析URL: {urls}") + try: + loop = asyncio.get_running_loop() + exa_params = {"text": True, "summary": True, "highlights": True} + func = functools.partial(self.exa.get_contents, urls, **exa_params) + contents_response = await loop.run_in_executor(None, func) + except Exception as e: + logger.error(f"执行 Exa URL解析时发生严重异常: {e}", exc_info=True) + contents_response = None # 确保异常后为None + + # 步骤 2: 处理Exa的响应 + if contents_response and hasattr(contents_response, 'statuses'): + results_map = {res.url: res for res in contents_response.results} if hasattr(contents_response, 'results') else {} + if contents_response.statuses: + for status in contents_response.statuses: + if status.status == 'success': + res = results_map.get(status.id) + if res: + summary = getattr(res, 'summary', '') + highlights = " ".join(getattr(res, 'highlights', [])) + text_snippet = (getattr(res, 'text', '')[:300] + '...') if getattr(res, 'text', '') else '' + snippet = summary or highlights or text_snippet or '无摘要' + + successful_results.append({ + "title": getattr(res, 'title', '无标题'), + "url": getattr(res, 'url', status.id), + "snippet": snippet, + "source": "exa" + }) + else: + error_tag = getattr(status, 'error', '未知错误') + logger.warning(f"Exa解析URL '{status.id}' 失败: {error_tag}。准备本地重试。") + urls_to_retry_locally.append(status.id) + else: + # 如果Exa未配置、API调用失败或返回无效响应,则所有URL都进入本地重试 + urls_to_retry_locally.extend(url for url in urls if url not in [res['url'] for res in successful_results]) + + + # 步骤 3: 对失败的URL进行本地解析 + if urls_to_retry_locally: + logger.info(f"开始本地解析以下URL: {urls_to_retry_locally}") + local_tasks = [self._local_parse_and_summarize(url) for url in urls_to_retry_locally] + local_results = await asyncio.gather(*local_tasks) + + for i, res in enumerate(local_results): + url = urls_to_retry_locally[i] + if "error" in res: + error_messages.append(f"URL: {url} - 解析失败: {res['error']}") + else: + successful_results.append(res) + + if not successful_results: + return {"error": "无法从所有给定的URL获取内容。", "details": error_messages} + + formatted_content = self._format_results(successful_results) + + result = { + "type": "url_parse_result", + "content": formatted_content, + "errors": error_messages + } + + return result + + def _format_results(self, results: List[Dict[str, Any]]) -> str: + """ + 将成功解析的结果列表格式化为一段简洁的文本。 + """ + formatted_parts = [] + for res in results: + title = res.get('title', '无标题') + url = res.get('url', '#') + snippet = res.get('snippet', '无摘要') + source = res.get('source', '未知') + + formatted_string = f"**{title}**\n" + formatted_string += f"**内容摘要**:\n{snippet}\n" + formatted_string += f"**来源**: {url} (由 {source} 解析)\n" + formatted_parts.append(formatted_string) + + return "\n---\n".join(formatted_parts) + +@register_plugin +class WEBSEARCHPLUGIN(BasePlugin): + + # 插件基本信息 + plugin_name: str = "hello_world_plugin" # 内部标识符 + enable_plugin: bool = True + dependencies: List[str] = [] # 插件依赖列表 + python_dependencies: List[str] = ["asyncddgs","exa_py"] # Python包依赖列表 + config_file_name: str = "config.toml" # 配置文件名 + + # 配置节描述 + config_section_descriptions = {"plugin": "插件基本信息", "exa": "EXA相关配置", "components": "组件设置"} + + # 配置Schema定义 + config_schema: dict = { + "plugin": { + "name": ConfigField(type=str, default="WEB_SEARCH_PLUGIN", description="插件名称"), + "version": ConfigField(type=str, default="1.0.0", description="插件版本"), + "enabled": ConfigField(type=bool, default=False, description="是否启用插件"), + }, + "exa":{ + "api_key":ConfigField(type=str, default="None", description="exa的API密钥") + }, + "components":{ + "enable_web_search_tool":ConfigField(type=bool, default=True, description="是否启用联网搜索tool"), + "enable_url_tool":ConfigField(type=bool, default=True, description="是否启用URL解析tool") + } + } + def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: + enable_tool =[] + if self.get_config("components.enable_web_search_tool"): + enable_tool.append((WebSurfingTool.get_tool_info(), WebSurfingTool)) + if self.get_config("components.enable_url_tool"): + enable_tool.append((URLParserTool.get_tool_info(), URLParserTool)) + return enable_tool