diff --git a/src/chat/antipromptinjector/counter_attack.py b/src/chat/antipromptinjector/counter_attack.py index 255f8d3f3..ec2835378 100644 --- a/src/chat/antipromptinjector/counter_attack.py +++ b/src/chat/antipromptinjector/counter_attack.py @@ -18,10 +18,6 @@ logger = get_logger("anti_injector.counter_attack") class CounterAttackGenerator: """反击消息生成器""" - def __init__(self): - """初始化反击消息生成器""" - pass - def get_personality_context(self) -> str: """获取人格上下文信息 diff --git a/src/chat/antipromptinjector/decision/counter_attack.py b/src/chat/antipromptinjector/decision/counter_attack.py index 71c5f04ab..61ddf330b 100644 --- a/src/chat/antipromptinjector/decision/counter_attack.py +++ b/src/chat/antipromptinjector/decision/counter_attack.py @@ -18,9 +18,6 @@ logger = get_logger("anti_injector.counter_attack") class CounterAttackGenerator: """反击消息生成器""" - def __init__(self): - """初始化反击消息生成器""" - pass def get_personality_context(self) -> str: """获取人格上下文信息 diff --git a/src/chat/antipromptinjector/processors/message_processor.py b/src/chat/antipromptinjector/processors/message_processor.py index 3d569b458..f974306bb 100644 --- a/src/chat/antipromptinjector/processors/message_processor.py +++ b/src/chat/antipromptinjector/processors/message_processor.py @@ -17,10 +17,6 @@ logger = get_logger("anti_injector.message_processor") class MessageProcessor: """消息内容处理器""" - def __init__(self): - """初始化消息处理器""" - pass - def extract_text_content(self, message: MessageRecv) -> str: """提取消息中的文本内容,过滤掉引用的历史内容 diff --git a/src/chat/knowledge/embedding_store.py b/src/chat/knowledge/embedding_store.py index bdd223123..c17c88177 100644 --- a/src/chat/knowledge/embedding_store.py +++ b/src/chat/knowledge/embedding_store.py @@ -144,8 +144,7 @@ class EmbeddingStore: # 确保事件循环被正确关闭 try: loop.close() - except Exception: - pass + except Exception: ... def _get_embeddings_batch_threaded(self, strs: List[str], chunk_size: int = 10, max_workers: int = 10, progress_callback=None) -> List[Tuple[str, List[float]]]: """使用多线程批量获取嵌入向量 diff --git a/src/chat/knowledge/utils/json_fix.py b/src/chat/knowledge/utils/json_fix.py index 53fa8f36f..0bf1bc49d 100644 --- a/src/chat/knowledge/utils/json_fix.py +++ b/src/chat/knowledge/utils/json_fix.py @@ -58,8 +58,7 @@ def fix_broken_generated_json(json_str: str) -> str: # Try to load the JSON to see if it is valid json.loads(json_str) return json_str # Return as-is if valid - except json.JSONDecodeError: - pass + except json.JSONDecodeError: ... # Step 1: Remove trailing content after the last comma. last_comma_index = json_str.rfind(",") diff --git a/src/chat/memory_system/instant_memory.py b/src/chat/memory_system/instant_memory.py index b55784c2e..c969c70a7 100644 --- a/src/chat/memory_system/instant_memory.py +++ b/src/chat/memory_system/instant_memory.py @@ -25,14 +25,6 @@ class MemoryItem: self.keywords: list[str] = keywords self.create_time: float = time.time() self.last_view_time: float = time.time() - - -class MemoryManager: - def __init__(self): - # self.memory_items:list[MemoryItem] = [] - pass - - class InstantMemory: def __init__(self, chat_id): self.chat_id = chat_id @@ -219,14 +211,12 @@ class InstantMemory: try: dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S") return dt, dt + timedelta(hours=1) - except Exception: - pass + except Exception: ... # 具体日期 try: dt = datetime.strptime(time_str, "%Y-%m-%d") return dt, dt + timedelta(days=1) - except Exception: - pass + except Exception: ... # 相对时间 if time_str == "今天": start = now.replace(hour=0, minute=0, second=0, microsecond=0) diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index a7eea39f3..cc5b7f0f6 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -170,7 +170,7 @@ class ChatBot: logger.error(f"处理命令时出错: {e}") return False, None, True # 出错时继续处理消息 - async def hanle_notice_message(self, message: MessageRecv): + async def handle_notice_message(self, message: MessageRecv): if message.message_info.message_id == "notice": message.is_notify = True logger.info("notice消息") @@ -265,9 +265,7 @@ class ChatBot: # logger.debug(str(message_data)) message = MessageRecv(message_data) - if await self.hanle_notice_message(message): - # return - pass + if await self.handle_notice_message(message): ... group_info = message.message_info.group_info user_info = message.message_info.user_info diff --git a/src/chat/utils/chat_message_builder.py b/src/chat/utils/chat_message_builder.py index 842a72705..6b723cf00 100644 --- a/src/chat/utils/chat_message_builder.py +++ b/src/chat/utils/chat_message_builder.py @@ -791,9 +791,8 @@ def build_pic_mapping_info(pic_id_mapping: Dict[str, str]) -> str: image = session.execute(select(Images).where(Images.image_id == pic_id)).scalar() if image and image.description: description = image.description - except Exception: + except Exception: ... # 如果查询失败,保持默认描述 - pass mapping_lines.append(f"[{display_name}] 的内容:{description}") diff --git a/src/chat/utils/prompt_builder.py b/src/chat/utils/prompt_builder.py index 89289607d..498934339 100644 --- a/src/chat/utils/prompt_builder.py +++ b/src/chat/utils/prompt_builder.py @@ -70,8 +70,8 @@ class PromptContext: # 如果reset失败,尝试直接设置 try: self._current_context = previous_context - except Exception: - pass # 静默忽略恢复失败 + except Exception: ... + # 静默忽略恢复失败 async def get_prompt_async(self, name: str) -> Optional["Prompt"]: """异步获取当前作用域中的提示模板""" diff --git a/src/chat/utils/utils_image.py b/src/chat/utils/utils_image.py index 1f5bc3aec..515300e8d 100644 --- a/src/chat/utils/utils_image.py +++ b/src/chat/utils/utils_image.py @@ -381,8 +381,7 @@ class ImageManager: # 确保是RGB格式方便比较 frame = gif.convert("RGB") all_frames.append(frame.copy()) - except EOFError: - pass # 读完啦 + except EOFError: ... # 读完啦 if not all_frames: logger.warning("GIF中没有找到任何帧") diff --git a/src/chat/willing/mode_classical.py b/src/chat/willing/mode_classical.py index 16d67bb5e..84e8ebff8 100644 --- a/src/chat/willing/mode_classical.py +++ b/src/chat/willing/mode_classical.py @@ -44,7 +44,7 @@ class ClassicalWillingManager(BaseWillingManager): return reply_probability - async def before_generate_reply_handle(self, message_id): + async def before_generate_reply_handle(self, message_id): pass async def after_generate_reply_handle(self, message_id): diff --git a/src/config/config.py b/src/config/config.py index 93cdf33a2..c87ae5767 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -68,7 +68,7 @@ TEMPLATE_DIR = os.path.join(PROJECT_ROOT, "template") # 考虑到,实际上配置文件中的mai_version是不会自动更新的,所以采用硬编码 # 对该字段的更新,请严格参照语义化版本规范:https://semver.org/lang/zh-CN/ -MMC_VERSION = "0.10.0-snapshot.5" +MMC_VERSION = "0.10.0-alpha-1" def get_key_comment(toml_table, key): diff --git a/src/plugins/built_in/WEB_SEARCH_TOOL/bing_search.py b/src/plugins/built_in/WEB_SEARCH_TOOL/bing_search.py deleted file mode 100644 index 3158d9720..000000000 --- a/src/plugins/built_in/WEB_SEARCH_TOOL/bing_search.py +++ /dev/null @@ -1,436 +0,0 @@ -from src.common.logger import get_logger -from bs4 import BeautifulSoup -import requests -import random -import os -import traceback - -logger = get_logger("web_surfing_tool") - -ABSTRACT_MAX_LENGTH = 300 # abstract max length - -user_agents = [ - # Edge浏览器 - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", - # Chrome浏览器 - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", - # Firefox浏览器 - "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:123.0) Gecko/20100101 Firefox/123.0", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0", - # Safari浏览器 - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Safari/605.1.15", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15", - # 移动端浏览器 - "Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1", - "Mozilla/5.0 (iPad; CPU OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1", - "Mozilla/5.0 (Linux; Android 14; SM-S918B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36", - # 搜索引擎爬虫 (模拟) - "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)", - "Mozilla/5.0 (compatible; Bingbot/2.0; +http://www.bing.com/bingbot.htm)", - "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)", -] - -# 请求头信息 -HEADERS = { - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", - "Accept-Encoding": "gzip, deflate, br", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", - "Cache-Control": "max-age=0", - "Connection": "keep-alive", - "Host": "www.bing.com", - "Referer": "https://www.bing.com/", - "Sec-Ch-Ua": '"Chromium";v="122", "Microsoft Edge";v="122", "Not-A.Brand";v="99"', - "Sec-Ch-Ua-Mobile": "?0", - "Sec-Ch-Ua-Platform": '"Windows"', - "Sec-Fetch-Dest": "document", - "Sec-Fetch-Mode": "navigate", - "Sec-Fetch-Site": "same-origin", - "Sec-Fetch-User": "?1", - "Upgrade-Insecure-Requests": "1", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", -} - -# 替代的中国区必应请求头 -CN_BING_HEADERS = { - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", - "Accept-Encoding": "gzip, deflate, br", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", - "Cache-Control": "max-age=0", - "Connection": "keep-alive", - "Host": "cn.bing.com", - "Referer": "https://cn.bing.com/", - "Sec-Ch-Ua": '"Chromium";v="122", "Microsoft Edge";v="122", "Not-A.Brand";v="99"', - "Sec-Ch-Ua-Mobile": "?0", - "Sec-Ch-Ua-Platform": '"Windows"', - "Sec-Fetch-Dest": "document", - "Sec-Fetch-Mode": "navigate", - "Sec-Fetch-Site": "same-origin", - "Sec-Fetch-User": "?1", - "Upgrade-Insecure-Requests": "1", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", -} - -bing_host_url = "https://www.bing.com" -bing_search_url = "https://www.bing.com/search?q=" -cn_bing_host_url = "https://cn.bing.com" -cn_bing_search_url = "https://cn.bing.com/search?q=" - - -class BingSearch: - session = requests.Session() - session.headers = HEADERS - - def search(self, keyword, num_results=10): - """ - 通过关键字进行搜索 - :param keyword: 关键字 - :param num_results: 指定返回的结果个数 - :return: 结果列表 - """ - if not keyword: - return None - - list_result = [] - page = 1 - - # 起始搜索的url - next_url = bing_search_url + keyword - - # 循环遍历每一页的搜索结果,并返回下一页的url - while len(list_result) < num_results: - data, next_url = self.parse_html(next_url, rank_start=len(list_result)) - if data: - list_result += data - logger.debug( - "---searching[{}], finish parsing page {}, results number={}: ".format(keyword, page, len(data)) - ) - for d in data: - logger.debug(str(d)) - - if not next_url: - logger.debug("already search the last page。") - break - page += 1 - - logger.debug("\n---search [{}] finished. total results number={}!".format(keyword, len(list_result))) - return list_result[:num_results] if len(list_result) > num_results else list_result - - def parse_html(self, url, rank_start=0, debug=0): - """ - 解析处理结果 - :param url: 需要抓取的 url - :return: 结果列表,下一页的url - """ - try: - logger.debug("--search_bing-------url: {}".format(url)) - - # 确定是国际版还是中国版必应 - is_cn_bing = "cn.bing.com" in url - - # 保存当前URL以便调试 - query_part = url.split("?q=")[1] if "?q=" in url else "unknown_query" - debug_filename = f"debug/bing_{'cn' if is_cn_bing else 'www'}_search_{query_part[:30]}.html" - - # 设置必要的Cookie - cookies = { - "SRCHHPGUSR": "SRCHLANG=zh-Hans", # 设置默认搜索语言为中文 - "SRCHD": "AF=NOFORM", - "SRCHUID": "V=2&GUID=1A4D4F1C8844493F9A2E3DB0D1BC806C", - "_SS": "SID=0D89D9A3C95C60B62E7AC80CC85461B3", - "_EDGE_S": "ui=zh-cn", # 设置界面语言为中文 - "_EDGE_V": "1", - } - - # 使用适当的请求头 - # 为每次请求随机选择不同的用户代理,降低被屏蔽风险 - headers = CN_BING_HEADERS.copy() if is_cn_bing else HEADERS.copy() - headers["User-Agent"] = random.choice(user_agents) - - # 为不同域名使用不同的Session,避免Cookie污染 - session = requests.Session() - session.headers.update(headers) - session.cookies.update(cookies) - - # 添加超时和重试,降低超时时间并允许重试 - try: - res = session.get( - url=url, timeout=(3.05, 6), verify=True, allow_redirects=True - ) # 超时分别为连接超时和读取超时 - except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: - # 如果第一次尝试超时,使用更宽松的设置再试一次 - logger.warning(f"第一次请求超时,正在重试: {str(e)}") - try: - # 第二次尝试使用更长的超时时间 - res = session.get(url=url, timeout=(5, 10), verify=False) # 忽略SSL验证 - except Exception as e2: - logger.error(f"第二次请求也失败: {str(e2)}") - # 如果所有尝试都失败,返回空结果 - return [], None - - res.encoding = "utf-8" - - # 保存响应内容以便调试 - os.makedirs("debug", exist_ok=True) - with open(debug_filename, "w", encoding="utf-8") as f: - f.write(res.text) - - # 检查响应状态 - logger.debug(f"--search_bing-------status_code: {res.status_code}") - if res.status_code == 403: - logger.error("被禁止访问 (403 Forbidden),可能是IP被限制") - # 如果被禁止,返回空结果 - return [], None - - if res.status_code != 200: - logger.error(f"必应搜索请求失败,状态码: {res.status_code}") - return None, None - - # 检查是否被重定向到登录页面或验证页面 - if "login.live.com" in res.url or "login.microsoftonline.com" in res.url: - logger.error("被重定向到登录页面,可能需要登录") - return None, None - - if "https://www.bing.com/ck/a" in res.url: - logger.error("被重定向到验证页面,可能被识别为机器人") - return None, None - - # 解析HTML - 添加对多种解析器的支持 - try: - # 首先尝试使用lxml解析器 - root = BeautifulSoup(res.text, "lxml") - except Exception as e: - logger.warning(f"lxml解析器不可用: {str(e)},尝试使用html.parser") - try: - # 如果lxml不可用,使用内置解析器 - root = BeautifulSoup(res.text, "html.parser") - except Exception as e2: - logger.error(f"HTML解析失败: {str(e2)}") - return None, None - - # 保存解析结果的一小部分用于调试 - sample_html = str(root)[:1000] if root else "" - logger.debug(f"HTML解析结果示例: {sample_html}") - - list_data = [] - - # 确保我们能获取到内容 - 先尝试直接提取链接 - all_links = root.find_all("a") - - # 记录链接总数,帮助诊断 - logger.debug(f"页面中总共找到了 {len(all_links)} 个链接") - - # 保存一些链接示例到日志 - sample_links = [] - for i, link in enumerate(all_links): - if i < 10: # 只记录前10个链接 - sample_links.append({"text": link.text.strip(), "href": link.get("href", "")}) - logger.debug(f"链接示例: {sample_links}") - - # 方法0:查找动态提取的结果 - # 尝试查找包含完整结果项的父容器 - result_containers = [] - # 一些可能的结果容器选择器 - container_selectors = [ - "ol#b_results", - "div.b_searchResults", - "div#b_content", - "div.srchrslt_main", - "div.mspg_cont", - "div.ms-srchResult-results", - "div#ContentAll", - "div.resultlist", - ] - - for selector in container_selectors: - containers = root.select(selector) - if containers: - logger.debug(f"找到可能的结果容器: {selector}, 数量: {len(containers)}") - result_containers.extend(containers) - - # 如果找到容器,尝试在容器中寻找有价值的链接 - extracted_items = [] - if result_containers: - for container in result_containers: - # 查找标题元素(h1, h2, h3, h4) - for heading in container.find_all(["h1", "h2", "h3", "h4", "strong", "b"]): - # 如果标题元素包含链接,这很可能是搜索结果的标题 - link = heading.find("a") - if link and link.get("href") and link.text.strip(): - url = link.get("href") - title = link.text.strip() - - # 如果是有效的外部链接 - if ( - not url.startswith("javascript:") - and not url.startswith("#") - and not any(x in url for x in ["bing.com/search", "bing.com/images"]) - ): - # 查找摘要:尝试找到相邻的段落元素 - abstract = "" - # 尝试在标题后面查找摘要 - next_elem = heading.next_sibling - while next_elem and not abstract: - if hasattr(next_elem, "name") and next_elem.name in ["p", "div", "span"]: - abstract = next_elem.text.strip() - break - next_elem = next_elem.next_sibling - - # 如果没找到,尝试在父元素内查找其他段落 - if not abstract: - parent = heading.parent - for p in parent.find_all( - ["p", "div"], - class_=lambda c: c - and any( - x in str(c) for x in ["desc", "abstract", "snippet", "caption", "summary"] - ), - ): - if p != heading: - abstract = p.text.strip() - break - - # 创建结果项 - extracted_items.append( - { - "title": title, - "url": url, - "abstract": abstract, - } - ) - logger.debug(f"提取到搜索结果: {title}") - - # 如果找到了结果,添加到列表 - if extracted_items: - for rank, item in enumerate(extracted_items, start=rank_start + 1): - # 裁剪摘要长度 - abstract = item["abstract"] - if ABSTRACT_MAX_LENGTH and len(abstract) > ABSTRACT_MAX_LENGTH: - abstract = abstract[:ABSTRACT_MAX_LENGTH] - - list_data.append({"title": item["title"], "abstract": abstract, "url": item["url"], "rank": rank}) - logger.debug(f"从容器中提取了 {len(list_data)} 个搜索结果") - if list_data: - return list_data, None - - # 如果上面的方法没有找到结果,尝试通用链接提取 - valid_links = [] - for link in all_links: - href = link.get("href", "") - text = link.text.strip() - - # 有效的搜索结果链接通常有这些特点 - if ( - href - and text - and len(text) > 10 # 标题通常比较长 - and not href.startswith("javascript:") - and not href.startswith("#") - and not any( - x in href - for x in [ - "bing.com/search", - "bing.com/images", - "bing.com/videos", - "bing.com/maps", - "bing.com/news", - "login", - "account", - "javascript", - "about.html", - "help.html", - "microsoft", - ] - ) - and "http" in href - ): # 必须是有效URL - valid_links.append(link) - - # 按文本长度排序,更长的文本更可能是搜索结果标题 - valid_links.sort(key=lambda x: len(x.text.strip()), reverse=True) - - if valid_links: - logger.debug(f"找到 {len(valid_links)} 个可能的搜索结果链接") - - # 提取前10个作为搜索结果 - for rank, link in enumerate(valid_links[:10], start=rank_start + 1): - href = link.get("href", "") - text = link.text.strip() - - # 获取摘要 - abstract = "" - # 尝试获取父元素的文本作为摘要 - parent = link.parent - if parent and parent.text: - full_text = parent.text.strip() - if len(full_text) > len(text): - abstract = full_text.replace(text, "", 1).strip() - - # 如果没有找到好的摘要,尝试查找相邻元素 - if len(abstract) < 20: - next_elem = link.next_sibling - while next_elem and len(abstract) < 20: - if hasattr(next_elem, "text") and next_elem.text.strip(): - abstract = next_elem.text.strip() - break - next_elem = next_elem.next_sibling - - # 裁剪摘要长度 - if ABSTRACT_MAX_LENGTH and len(abstract) > ABSTRACT_MAX_LENGTH: - abstract = abstract[:ABSTRACT_MAX_LENGTH] - - list_data.append({"title": text, "abstract": abstract, "url": href, "rank": rank}) - logger.debug(f"提取到备选搜索结果 #{rank}: {text}") - - # 如果找到了结果,返回 - if list_data: - logger.debug(f"通过备选方法提取了 {len(list_data)} 个搜索结果") - return list_data, None - - # 检查是否有错误消息 - error_msg = root.find("div", class_="b_searcherrmsg") - if error_msg: - logger.error(f"必应搜索返回错误: {error_msg.text.strip()}") - - # 找到下一页按钮 (尝试多种可能的选择器) - next_url = None - - # 方式1: 标准下一页按钮 - pagination_classes = ["b_widePag sb_bp", "b_pag"] - for cls in pagination_classes: - next_page = root.find("a", class_=cls) - if next_page and any(txt in next_page.text for txt in ["下一页", "Next", "下页"]): - next_url = next_page.get("href", "") - if next_url and not next_url.startswith("http"): - next_url = (cn_bing_host_url if is_cn_bing else bing_host_url) + next_url - break - - # 方式2: 备用下一页按钮 - if not next_url: - pagination = root.find_all("a", class_="sb_pagN") - if pagination: - next_url = pagination[0].get("href", "") - if next_url and not next_url.startswith("http"): - next_url = (cn_bing_host_url if is_cn_bing else bing_host_url) + next_url - - # 方式3: 通用导航元素 - if not next_url: - nav_links = root.find_all("a") - for link in nav_links: - if link.text.strip() in ["下一页", "Next", "下页", "»", ">>"]: - next_url = link.get("href", "") - if next_url and not next_url.startswith("http"): - next_url = (cn_bing_host_url if is_cn_bing else bing_host_url) + next_url - break - - logger.debug(f"已解析 {len(list_data)} 个结果,下一页链接: {next_url}") - return list_data, next_url - - except Exception as e: - logger.error(f"解析页面时出错: {str(e)}") - logger.debug(traceback.format_exc()) - return None, None diff --git a/src/plugins/built_in/WEB_SEARCH_TOOL/plugin.py b/src/plugins/built_in/WEB_SEARCH_TOOL/plugin.py index edfbf86f8..ce68576df 100644 --- a/src/plugins/built_in/WEB_SEARCH_TOOL/plugin.py +++ b/src/plugins/built_in/WEB_SEARCH_TOOL/plugin.py @@ -1,654 +1,81 @@ -import asyncio -import functools -import itertools -from typing import Any, Dict, List -from datetime import datetime, timedelta -from exa_py import Exa -from asyncddgs import aDDGS -from tavily import TavilyClient -from .bing_search import BingSearch +""" +Web Search Tool Plugin + +一个功能强大的网络搜索和URL解析插件,支持多种搜索引擎和解析策略。 +""" +from typing import List, Tuple, Type -from src.common.logger import get_logger -from typing import Tuple,Type from src.plugin_system import ( BasePlugin, register_plugin, - BaseTool, ComponentInfo, ConfigField, - llm_api, - ToolParamType, PythonDependency ) -from src.plugin_system.apis import config_api # 添加config_api导入 -from src.common.cache_manager import tool_cache -import httpx -from bs4 import BeautifulSoup +from src.plugin_system.apis import config_api +from src.common.logger import get_logger -logger = get_logger("web_surfing_tool") +from .tools.web_search import WebSurfingTool +from .tools.url_parser import URLParserTool +logger = get_logger("web_search_plugin") -class WebSurfingTool(BaseTool): - name: str = "web_search" - description: str = "用于执行网络搜索。当用户明确要求搜索,或者需要获取关于公司、产品、事件的最新信息、新闻或动态时,必须使用此工具" - available_for_llm: bool = True - parameters = [ - ("query", ToolParamType.STRING, "要搜索的关键词或问题。", True, None), - ("num_results", ToolParamType.INTEGER, "期望每个搜索引擎返回的搜索结果数量,默认为5。", False, None), - ("time_range", ToolParamType.STRING, "指定搜索的时间范围,可以是 'any', 'week', 'month'。默认为 'any'。", False, ["any", "week", "month"]) - ] # type: ignore - - def __init__(self, plugin_config=None): - super().__init__(plugin_config) - self.bing_search = BingSearch() - - # 初始化EXA API密钥轮询器 - self.exa_clients = [] - self.exa_key_cycle = None - - # 优先从主配置文件读取,如果没有则从插件配置文件读取 - EXA_API_KEYS = config_api.get_global_config("exa.api_keys", None) - if EXA_API_KEYS is None: - # 从插件配置文件读取 - EXA_API_KEYS = self.get_config("exa.api_keys", []) - - if isinstance(EXA_API_KEYS, list) and EXA_API_KEYS: - valid_keys = [key.strip() for key in EXA_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")] - if valid_keys: - self.exa_clients = [Exa(api_key=key) for key in valid_keys] - self.exa_key_cycle = itertools.cycle(self.exa_clients) - logger.info(f"已配置 {len(valid_keys)} 个 Exa API 密钥") - else: - logger.warning("Exa API Keys 配置无效,Exa 搜索功能将不可用。") - else: - logger.warning("Exa API Keys 未配置,Exa 搜索功能将不可用。") - - # 初始化Tavily API密钥轮询器 - self.tavily_clients = [] - self.tavily_key_cycle = None - - # 优先从主配置文件读取,如果没有则从插件配置文件读取 - TAVILY_API_KEYS = config_api.get_global_config("tavily.api_keys", None) - if TAVILY_API_KEYS is None: - # 从插件配置文件读取 - TAVILY_API_KEYS = self.get_config("tavily.api_keys", []) - - if isinstance(TAVILY_API_KEYS, list) and TAVILY_API_KEYS: - valid_keys = [key.strip() for key in TAVILY_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")] - if valid_keys: - self.tavily_clients = [TavilyClient(api_key=key) for key in valid_keys] - self.tavily_key_cycle = itertools.cycle(self.tavily_clients) - logger.info(f"已配置 {len(valid_keys)} 个 Tavily API 密钥") - else: - logger.warning("Tavily API Keys 配置无效,Tavily 搜索功能将不可用。") - else: - logger.warning("Tavily API Keys 未配置,Tavily 搜索功能将不可用。") - - async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]: - query = function_args.get("query") - if not query: - return {"error": "搜索查询不能为空。"} - - # 获取当前文件路径用于缓存键 - import os - current_file_path = os.path.abspath(__file__) - - # 检查缓存 - query = function_args.get("query") - cached_result = await tool_cache.get(self.name, function_args, current_file_path, semantic_query=query) - if cached_result: - logger.info(f"缓存命中: {self.name} -> {function_args}") - return cached_result - - # 读取搜索配置 - enabled_engines = config_api.get_global_config("web_search.enabled_engines", ["ddg"]) - search_strategy = config_api.get_global_config("web_search.search_strategy", "single") - - logger.info(f"开始搜索,策略: {search_strategy}, 启用引擎: {enabled_engines}, 参数: '{function_args}'") - - # 根据策略执行搜索 - if search_strategy == "parallel": - result = await self._execute_parallel_search(function_args, enabled_engines) - elif search_strategy == "fallback": - result = await self._execute_fallback_search(function_args, enabled_engines) - else: # single - result = await self._execute_single_search(function_args, enabled_engines) - - # 保存到缓存 - if "error" not in result: - query = function_args.get("query") - await tool_cache.set(self.name, function_args, current_file_path, result, semantic_query=query) - - return result - - async def _execute_parallel_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]: - """并行搜索策略:同时使用所有启用的搜索引擎""" - search_tasks = [] - - for engine in enabled_engines: - custom_args = function_args.copy() - custom_args["num_results"] = custom_args.get("num_results", 5) - if engine == "exa" and self.exa_clients: - # 使用参数中的数量,如果没有则默认5个 - search_tasks.append(self._search_exa(custom_args)) - elif engine == "tavily" and self.tavily_clients: - search_tasks.append(self._search_tavily(custom_args)) - elif engine == "ddg": - search_tasks.append(self._search_ddg(custom_args)) - elif engine == "bing": - search_tasks.append(self._search_bing(custom_args)) - - if not search_tasks: - return {"error": "没有可用的搜索引擎。"} - - try: - search_results_lists = await asyncio.gather(*search_tasks, return_exceptions=True) - - all_results = [] - for result in search_results_lists: - if isinstance(result, list): - all_results.extend(result) - elif isinstance(result, Exception): - logger.error(f"搜索时发生错误: {result}") - - # 去重并格式化 - unique_results = self._deduplicate_results(all_results) - formatted_content = self._format_results(unique_results) - - return { - "type": "web_search_result", - "content": formatted_content, - } - - except Exception as e: - logger.error(f"执行并行网络搜索时发生异常: {e}", exc_info=True) - return {"error": f"执行网络搜索时发生严重错误: {str(e)}"} - - async def _execute_fallback_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]: - """回退搜索策略:按顺序尝试搜索引擎,失败则尝试下一个""" - for engine in enabled_engines: - try: - custom_args = function_args.copy() - custom_args["num_results"] = custom_args.get("num_results", 5) - - if engine == "exa" and self.exa_clients: - results = await self._search_exa(custom_args) - elif engine == "tavily" and self.tavily_clients: - results = await self._search_tavily(custom_args) - elif engine == "ddg": - results = await self._search_ddg(custom_args) - elif engine == "bing": - results = await self._search_bing(custom_args) - else: - continue - - if results: # 如果有结果,直接返回 - formatted_content = self._format_results(results) - return { - "type": "web_search_result", - "content": formatted_content, - } - - except Exception as e: - logger.warning(f"{engine} 搜索失败,尝试下一个引擎: {e}") - continue - - return {"error": "所有搜索引擎都失败了。"} - - async def _execute_single_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]: - """单一搜索策略:只使用第一个可用的搜索引擎""" - for engine in enabled_engines: - custom_args = function_args.copy() - custom_args["num_results"] = custom_args.get("num_results", 5) - - try: - if engine == "exa" and self.exa_clients: - results = await self._search_exa(custom_args) - elif engine == "tavily" and self.tavily_clients: - results = await self._search_tavily(custom_args) - elif engine == "ddg": - results = await self._search_ddg(custom_args) - elif engine == "bing": - results = await self._search_bing(custom_args) - else: - continue - - formatted_content = self._format_results(results) - return { - "type": "web_search_result", - "content": formatted_content, - } - - except Exception as e: - logger.error(f"{engine} 搜索失败: {e}") - return {"error": f"{engine} 搜索失败: {str(e)}"} - - return {"error": "没有可用的搜索引擎。"} - - def _deduplicate_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - unique_urls = set() - unique_results = [] - for res in results: - if isinstance(res, dict) and res.get("url") and res["url"] not in unique_urls: - unique_urls.add(res["url"]) - unique_results.append(res) - return unique_results - - async def _search_exa(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: - query = args["query"] - num_results = args.get("num_results", 3) - time_range = args.get("time_range", "any") - - exa_args = {"num_results": num_results, "text": True, "highlights": True} - if time_range != "any": - today = datetime.now() - start_date = today - timedelta(days=7 if time_range == "week" else 30) - exa_args["start_published_date"] = start_date.strftime('%Y-%m-%d') - - try: - if not self.exa_key_cycle: - return [] - - # 使用轮询机制获取下一个客户端 - exa_client = next(self.exa_key_cycle) - loop = asyncio.get_running_loop() - func = functools.partial(exa_client.search_and_contents, query, **exa_args) - search_response = await loop.run_in_executor(None, func) - - return [ - { - "title": res.title, - "url": res.url, - "snippet": " ".join(getattr(res, 'highlights', [])) or (getattr(res, 'text', '')[:250] + '...'), - "provider": "Exa" - } - for res in search_response.results - ] - except Exception as e: - logger.error(f"Exa 搜索失败: {e}") - return [] - - async def _search_tavily(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: - query = args["query"] - num_results = args.get("num_results", 3) - time_range = args.get("time_range", "any") - - try: - if not self.tavily_key_cycle: - return [] - - # 使用轮询机制获取下一个客户端 - tavily_client = next(self.tavily_key_cycle) - - # 构建Tavily搜索参数 - search_params = { - "query": query, - "max_results": num_results, - "search_depth": "basic", - "include_answer": False, - "include_raw_content": False - } - - # 根据时间范围调整搜索参数 - if time_range == "week": - search_params["days"] = 7 - elif time_range == "month": - search_params["days"] = 30 - - loop = asyncio.get_running_loop() - func = functools.partial(tavily_client.search, **search_params) - search_response = await loop.run_in_executor(None, func) - - results = [] - if search_response and "results" in search_response: - for res in search_response["results"]: - results.append({ - "title": res.get("title", "无标题"), - "url": res.get("url", ""), - "snippet": res.get("content", "")[:300] + "..." if res.get("content") else "无摘要", - "provider": "Tavily" - }) - - return results - - except Exception as e: - logger.error(f"Tavily 搜索失败: {e}") - return [] - - async def _search_ddg(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: - query = args["query"] - num_results = args.get("num_results", 3) - - try: - async with aDDGS() as ddgs: - search_response = await ddgs.text(query, max_results=num_results) - - return [ - { - "title": r.get("title"), - "url": r.get("href"), - "snippet": r.get("body"), - "provider": "DuckDuckGo" - } - for r in search_response - ] - except Exception as e: - logger.error(f"DuckDuckGo 搜索失败: {e}") - return [] - - async def _search_bing(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: - query = args["query"] - num_results = args.get("num_results", 3) - - try: - loop = asyncio.get_running_loop() - func = functools.partial(self.bing_search.search, query, num_results=num_results) - search_response = await loop.run_in_executor(None, func) - if search_response: - return [ - { - "title": r.get("title"), - "url": r.get("url"), - "snippet": r.get("abstract"), - "provider": "Bing" - } - for r in search_response - ] - except Exception as e: - logger.error(f"Bing 搜索失败: {e}") - return [] - - def _format_results(self, results: List[Dict[str, Any]]) -> str: - if not results: - return "没有找到相关的网络信息。" - - formatted_string = "根据网络搜索结果:\n\n" - for i, res in enumerate(results, 1): - title = res.get("title", '无标题') - url = res.get("url", '#') - snippet = res.get("snippet", '无摘要') - provider = res.get("provider", "未知来源") - - formatted_string += f"{i}. **{title}** (来自: {provider})\n" - formatted_string += f" - 摘要: {snippet}\n" - formatted_string += f" - 来源: {url}\n\n" - - return formatted_string - -class URLParserTool(BaseTool): - """ - 一个用于解析和总结一个或多个网页URL内容的工具。 - """ - name: str = "parse_url" - description: str = "当需要理解一个或多个特定网页链接的内容时,使用此工具。例如:'这些网页讲了什么?[https://example.com, https://example2.com]' 或 '帮我总结一下这些文章'" - available_for_llm: bool = True - parameters = [ - ("urls", ToolParamType.STRING, "要理解的网站", True, None), - ] - def __init__(self, plugin_config=None): - super().__init__(plugin_config) - - # 初始化EXA API密钥轮询器 - self.exa_clients = [] - self.exa_key_cycle = None - - # 优先从主配置文件读取,如果没有则从插件配置文件读取 - EXA_API_KEYS = config_api.get_global_config("exa.api_keys", None) - if EXA_API_KEYS is None: - # 从插件配置文件读取 - EXA_API_KEYS = self.get_config("exa.api_keys", []) - - if isinstance(EXA_API_KEYS, list) and EXA_API_KEYS: - valid_keys = [key.strip() for key in EXA_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")] - if valid_keys: - self.exa_clients = [Exa(api_key=key) for key in valid_keys] - self.exa_key_cycle = itertools.cycle(self.exa_clients) - logger.info(f"URL解析工具已配置 {len(valid_keys)} 个 Exa API 密钥") - else: - logger.warning("Exa API Keys 配置无效,URL解析功能将受限。") - else: - logger.warning("Exa API Keys 未配置,URL解析功能将受限。") - async def _local_parse_and_summarize(self, url: str) -> Dict[str, Any]: - """ - 使用本地库(httpx, BeautifulSoup)解析URL,并调用LLM进行总结。 - """ - try: - # 读取代理配置 - enable_proxy = self.get_config("proxy.enable_proxy", False) - proxies = None - - if enable_proxy: - socks5_proxy = self.get_config("proxy.socks5_proxy", None) - http_proxy = self.get_config("proxy.http_proxy", None) - https_proxy = self.get_config("proxy.https_proxy", None) - - # 优先使用SOCKS5代理(全协议代理) - if socks5_proxy: - proxies = socks5_proxy - logger.info(f"使用SOCKS5代理: {socks5_proxy}") - elif http_proxy or https_proxy: - proxies = {} - if http_proxy: - proxies["http://"] = http_proxy - if https_proxy: - proxies["https://"] = https_proxy - logger.info(f"使用HTTP/HTTPS代理配置: {proxies}") - - client_kwargs = {"timeout": 15.0, "follow_redirects": True} - if proxies: - client_kwargs["proxies"] = proxies - - async with httpx.AsyncClient(**client_kwargs) as client: - response = await client.get(url) - response.raise_for_status() - - soup = BeautifulSoup(response.text, "html.parser") - - title = soup.title.string if soup.title else "无标题" - for script in soup(["script", "style"]): - script.extract() - text = soup.get_text(separator="\n", strip=True) - - if not text: - return {"error": "无法从页面提取有效文本内容。"} - - summary_prompt = f"请根据以下网页内容,生成一段不超过300字的中文摘要,保留核心信息和关键点:\n\n---\n\n标题: {title}\n\n内容:\n{text[:4000]}\n\n---\n\n摘要:" - - - text_model = str(self.get_config("models.text_model", "replyer_1")) - models = llm_api.get_available_models() - model_config = models.get(text_model) - if not model_config: - logger.error("未配置LLM模型") - return {"error": "未配置LLM模型"} - - success, summary, reasoning, model_name = await llm_api.generate_with_model( - prompt=summary_prompt, - model_config=model_config, - request_type="story.generate", - temperature=0.3, - max_tokens=1000 - ) - - if not success: - logger.info(f"生成摘要失败: {summary}") - return {"error": "发生ai错误"} - - logger.info(f"成功生成摘要内容:'{summary}'") - - return { - "title": title, - "url": url, - "snippet": summary, - "source": "local" - } - - except httpx.HTTPStatusError as e: - logger.warning(f"本地解析URL '{url}' 失败 (HTTP {e.response.status_code})") - return {"error": f"请求失败,状态码: {e.response.status_code}"} - except Exception as e: - logger.error(f"本地解析或总结URL '{url}' 时发生未知异常: {e}", exc_info=True) - return {"error": f"发生未知错误: {str(e)}"} - - async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]: - """ - 执行URL内容提取和总结。优先使用Exa,失败后尝试本地解析。 - """ - # 获取当前文件路径用于缓存键 - import os - current_file_path = os.path.abspath(__file__) - - # 检查缓存 - cached_result = await tool_cache.get(self.name, function_args, current_file_path) - if cached_result: - logger.info(f"缓存命中: {self.name} -> {function_args}") - return cached_result - - urls_input = function_args.get("urls") - if not urls_input: - return {"error": "URL列表不能为空。"} - - # 处理URL输入,确保是列表格式 - if isinstance(urls_input, str): - # 如果是字符串,尝试解析为URL列表 - import re - # 提取所有HTTP/HTTPS URL - url_pattern = r'https?://[^\s\],]+' - urls = re.findall(url_pattern, urls_input) - if not urls: - # 如果没有找到标准URL,将整个字符串作为单个URL - if urls_input.strip().startswith(('http://', 'https://')): - urls = [urls_input.strip()] - else: - return {"error": "提供的字符串中未找到有效的URL。"} - elif isinstance(urls_input, list): - urls = [url.strip() for url in urls_input if isinstance(url, str) and url.strip()] - else: - return {"error": "URL格式不正确,应为字符串或列表。"} - - # 验证URL格式 - valid_urls = [] - for url in urls: - if url.startswith(('http://', 'https://')): - valid_urls.append(url) - else: - logger.warning(f"跳过无效URL: {url}") - - if not valid_urls: - return {"error": "未找到有效的URL。"} - - urls = valid_urls - logger.info(f"准备解析 {len(urls)} 个URL: {urls}") - - successful_results = [] - error_messages = [] - urls_to_retry_locally = [] - - # 步骤 1: 尝试使用 Exa API 进行解析 - contents_response = None - if self.exa_key_cycle: - logger.info(f"开始使用 Exa API 解析URL: {urls}") - try: - # 使用轮询机制获取下一个客户端 - exa_client = next(self.exa_key_cycle) - loop = asyncio.get_running_loop() - exa_params = {"text": True, "summary": True, "highlights": True} - func = functools.partial(exa_client.get_contents, urls, **exa_params) - contents_response = await loop.run_in_executor(None, func) - except Exception as e: - logger.error(f"执行 Exa URL解析时发生严重异常: {e}", exc_info=True) - contents_response = None # 确保异常后为None - - # 步骤 2: 处理Exa的响应 - if contents_response and hasattr(contents_response, 'statuses'): - results_map = {res.url: res for res in contents_response.results} if hasattr(contents_response, 'results') else {} - if contents_response.statuses: - for status in contents_response.statuses: - if status.status == 'success': - res = results_map.get(status.id) - if res: - summary = getattr(res, 'summary', '') - highlights = " ".join(getattr(res, 'highlights', [])) - text_snippet = (getattr(res, 'text', '')[:300] + '...') if getattr(res, 'text', '') else '' - snippet = summary or highlights or text_snippet or '无摘要' - - successful_results.append({ - "title": getattr(res, 'title', '无标题'), - "url": getattr(res, 'url', status.id), - "snippet": snippet, - "source": "exa" - }) - else: - error_tag = getattr(status, 'error', '未知错误') - logger.warning(f"Exa解析URL '{status.id}' 失败: {error_tag}。准备本地重试。") - urls_to_retry_locally.append(status.id) - else: - # 如果Exa未配置、API调用失败或返回无效响应,则所有URL都进入本地重试 - urls_to_retry_locally.extend(url for url in urls if url not in [res['url'] for res in successful_results]) - - - # 步骤 3: 对失败的URL进行本地解析 - if urls_to_retry_locally: - logger.info(f"开始本地解析以下URL: {urls_to_retry_locally}") - local_tasks = [self._local_parse_and_summarize(url) for url in urls_to_retry_locally] - local_results = await asyncio.gather(*local_tasks) - - for i, res in enumerate(local_results): - url = urls_to_retry_locally[i] - if "error" in res: - error_messages.append(f"URL: {url} - 解析失败: {res['error']}") - else: - successful_results.append(res) - - if not successful_results: - return {"error": "无法从所有给定的URL获取内容。", "details": error_messages} - - formatted_content = self._format_results(successful_results) - - result = { - "type": "url_parse_result", - "content": formatted_content, - "errors": error_messages - } - - # 保存到缓存 - import os - current_file_path = os.path.abspath(__file__) - if "error" not in result: - await tool_cache.set(self.name, function_args, current_file_path, result) - - return result - - def _format_results(self, results: List[Dict[str, Any]]) -> str: - """ - 将成功解析的结果列表格式化为一段简洁的文本。 - """ - formatted_parts = [] - for res in results: - title = res.get('title', '无标题') - url = res.get('url', '#') - snippet = res.get('snippet', '无摘要') - source = res.get('source', '未知') - - formatted_string = f"**{title}**\n" - formatted_string += f"**内容摘要**:\n{snippet}\n" - formatted_string += f"**来源**: {url} (由 {source} 解析)\n" - formatted_parts.append(formatted_string) - - return "\n---\n".join(formatted_parts) @register_plugin class WEBSEARCHPLUGIN(BasePlugin): + """ + 网络搜索工具插件 + + 提供网络搜索和URL解析功能,支持多种搜索引擎: + - Exa (需要API密钥) + - Tavily (需要API密钥) + - DuckDuckGo (免费) + - Bing (免费) + """ # 插件基本信息 plugin_name: str = "web_search_tool" # 内部标识符 enable_plugin: bool = True dependencies: List[str] = [] # 插件依赖列表 - # Python包依赖列表 - 支持两种格式: - # 方式1: 简单字符串列表(向后兼容) - # python_dependencies: List[str] = ["asyncddgs", "exa_py", "httpx[socks]"] - # 方式2: 详细的PythonDependency对象(推荐) + def __init__(self, *args, **kwargs): + """初始化插件,立即加载所有搜索引擎""" + super().__init__(*args, **kwargs) + + # 立即初始化所有搜索引擎,触发API密钥管理器的日志输出 + logger.info("🚀 正在初始化所有搜索引擎...") + try: + from .engines.exa_engine import ExaSearchEngine + from .engines.tavily_engine import TavilySearchEngine + from .engines.ddg_engine import DDGSearchEngine + from .engines.bing_engine import BingSearchEngine + + # 实例化所有搜索引擎,这会触发API密钥管理器的初始化 + exa_engine = ExaSearchEngine() + tavily_engine = TavilySearchEngine() + ddg_engine = DDGSearchEngine() + bing_engine = BingSearchEngine() + + # 报告每个引擎的状态 + engines_status = { + "Exa": exa_engine.is_available(), + "Tavily": tavily_engine.is_available(), + "DuckDuckGo": ddg_engine.is_available(), + "Bing": bing_engine.is_available() + } + + available_engines = [name for name, available in engines_status.items() if available] + unavailable_engines = [name for name, available in engines_status.items() if not available] + + if available_engines: + logger.info(f"✅ 可用搜索引擎: {', '.join(available_engines)}") + if unavailable_engines: + logger.info(f"❌ 不可用搜索引擎: {', '.join(unavailable_engines)}") + + except Exception as e: + logger.error(f"❌ 搜索引擎初始化失败: {e}", exc_info=True) + + # Python包依赖列表 python_dependencies: List[PythonDependency] = [ PythonDependency( package_name="asyncddgs", @@ -677,7 +104,10 @@ class WEBSEARCHPLUGIN(BasePlugin): config_file_name: str = "config.toml" # 配置文件名 # 配置节描述 - config_section_descriptions = {"plugin": "插件基本信息", "proxy": "链接本地解析代理配置"} + config_section_descriptions = { + "plugin": "插件基本信息", + "proxy": "链接本地解析代理配置" + } # 配置Schema定义 # 注意:EXA配置和组件设置已迁移到主配置文件(bot_config.toml)的[exa]和[web_search]部分 @@ -688,17 +118,43 @@ class WEBSEARCHPLUGIN(BasePlugin): "enabled": ConfigField(type=bool, default=False, description="是否启用插件"), }, "proxy": { - "http_proxy": ConfigField(type=str, default=None, description="HTTP代理地址,格式如: http://proxy.example.com:8080"), - "https_proxy": ConfigField(type=str, default=None, description="HTTPS代理地址,格式如: http://proxy.example.com:8080"), - "socks5_proxy": ConfigField(type=str, default=None, description="SOCKS5代理地址,格式如: socks5://proxy.example.com:1080"), - "enable_proxy": ConfigField(type=bool, default=False, description="是否启用代理") + "http_proxy": ConfigField( + type=str, + default=None, + description="HTTP代理地址,格式如: http://proxy.example.com:8080" + ), + "https_proxy": ConfigField( + type=str, + default=None, + description="HTTPS代理地址,格式如: http://proxy.example.com:8080" + ), + "socks5_proxy": ConfigField( + type=str, + default=None, + description="SOCKS5代理地址,格式如: socks5://proxy.example.com:1080" + ), + "enable_proxy": ConfigField( + type=bool, + default=False, + description="是否启用代理" + ) }, } + def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: - enable_tool =[] + """ + 获取插件组件列表 + + Returns: + 组件信息和类型的元组列表 + """ + enable_tool = [] + # 从主配置文件读取组件启用配置 - if config_api.get_global_config("web_search.enable_web_search_tool", True): + if True: enable_tool.append((WebSurfingTool.get_tool_info(), WebSurfingTool)) + if config_api.get_global_config("web_search.enable_url_tool", True): enable_tool.append((URLParserTool.get_tool_info(), URLParserTool)) + return enable_tool diff --git a/src/plugins/built_in/web_search_tool/engines/__init__.py b/src/plugins/built_in/web_search_tool/engines/__init__.py new file mode 100644 index 000000000..2f1c3492c --- /dev/null +++ b/src/plugins/built_in/web_search_tool/engines/__init__.py @@ -0,0 +1,3 @@ +""" +Search engines package +""" diff --git a/src/plugins/built_in/web_search_tool/engines/base.py b/src/plugins/built_in/web_search_tool/engines/base.py new file mode 100644 index 000000000..f7641aa2f --- /dev/null +++ b/src/plugins/built_in/web_search_tool/engines/base.py @@ -0,0 +1,31 @@ +""" +Base search engine interface +""" +from abc import ABC, abstractmethod +from typing import Dict, List, Any + + +class BaseSearchEngine(ABC): + """ + 搜索引擎基类 + """ + + @abstractmethod + async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + 执行搜索 + + Args: + args: 搜索参数,包含 query、num_results、time_range 等 + + Returns: + 搜索结果列表,每个结果包含 title、url、snippet、provider 字段 + """ + pass + + @abstractmethod + def is_available(self) -> bool: + """ + 检查搜索引擎是否可用 + """ + pass diff --git a/src/plugins/built_in/web_search_tool/engines/bing_engine.py b/src/plugins/built_in/web_search_tool/engines/bing_engine.py new file mode 100644 index 000000000..5ae226404 --- /dev/null +++ b/src/plugins/built_in/web_search_tool/engines/bing_engine.py @@ -0,0 +1,265 @@ +""" +Bing search engine implementation +""" +import asyncio +import functools +import random +import os +import traceback +from typing import Dict, List, Any +from datetime import datetime, timedelta +import requests +from bs4 import BeautifulSoup + +from src.common.logger import get_logger +from .base import BaseSearchEngine + +logger = get_logger("bing_engine") + +ABSTRACT_MAX_LENGTH = 300 # abstract max length + +user_agents = [ + # Edge浏览器 + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", + # Chrome浏览器 + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + # Firefox浏览器 + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:123.0) Gecko/20100101 Firefox/123.0", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0", +] + +# 请求头信息 +HEADERS = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "Accept-Encoding": "gzip, deflate, br", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", + "Cache-Control": "max-age=0", + "Connection": "keep-alive", + "Host": "www.bing.com", + "Referer": "https://www.bing.com/", + "Sec-Ch-Ua": '"Chromium";v="122", "Microsoft Edge";v="122", "Not-A.Brand";v="99"', + "Sec-Ch-Ua-Mobile": "?0", + "Sec-Ch-Ua-Platform": '"Windows"', + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "same-origin", + "Sec-Fetch-User": "?1", + "Upgrade-Insecure-Requests": "1", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", +} + +bing_search_url = "https://www.bing.com/search?q=" + + +class BingSearchEngine(BaseSearchEngine): + """ + Bing搜索引擎实现 + """ + + def __init__(self): + self.session = requests.Session() + self.session.headers = HEADERS + + def is_available(self) -> bool: + """检查Bing搜索引擎是否可用""" + return True # Bing是免费搜索引擎,总是可用 + + async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: + """执行Bing搜索""" + query = args["query"] + num_results = args.get("num_results", 3) + time_range = args.get("time_range", "any") + + try: + loop = asyncio.get_running_loop() + func = functools.partial(self._search_sync, query, num_results, time_range) + search_response = await loop.run_in_executor(None, func) + return search_response + except Exception as e: + logger.error(f"Bing 搜索失败: {e}") + return [] + + def _search_sync(self, keyword: str, num_results: int, time_range: str) -> List[Dict[str, Any]]: + """同步执行Bing搜索""" + if not keyword: + return [] + + list_result = [] + + # 构建搜索URL + search_url = bing_search_url + keyword + + # 如果指定了时间范围,添加时间过滤参数 + if time_range == "week": + search_url += "&qft=+filterui:date-range-7" + elif time_range == "month": + search_url += "&qft=+filterui:date-range-30" + + try: + data = self._parse_html(search_url) + if data: + list_result.extend(data) + logger.debug(f"Bing搜索 [{keyword}] 找到 {len(data)} 个结果") + + except Exception as e: + logger.error(f"Bing搜索解析失败: {e}") + return [] + + logger.debug(f"Bing搜索 [{keyword}] 完成,总共 {len(list_result)} 个结果") + return list_result[:num_results] if len(list_result) > num_results else list_result + + def _parse_html(self, url: str) -> List[Dict[str, Any]]: + """解析处理结果""" + try: + logger.debug(f"访问Bing搜索URL: {url}") + + # 设置必要的Cookie + cookies = { + "SRCHHPGUSR": "SRCHLANG=zh-Hans", # 设置默认搜索语言为中文 + "SRCHD": "AF=NOFORM", + "SRCHUID": "V=2&GUID=1A4D4F1C8844493F9A2E3DB0D1BC806C", + "_SS": "SID=0D89D9A3C95C60B62E7AC80CC85461B3", + "_EDGE_S": "ui=zh-cn", # 设置界面语言为中文 + "_EDGE_V": "1", + } + + # 为每次请求随机选择不同的用户代理,降低被屏蔽风险 + headers = HEADERS.copy() + headers["User-Agent"] = random.choice(user_agents) + + # 创建新的session + session = requests.Session() + session.headers.update(headers) + session.cookies.update(cookies) + + # 发送请求 + try: + res = session.get(url=url, timeout=(3.05, 6), verify=True, allow_redirects=True) + except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: + logger.warning(f"第一次请求超时,正在重试: {str(e)}") + try: + res = session.get(url=url, timeout=(5, 10), verify=False) + except Exception as e2: + logger.error(f"第二次请求也失败: {str(e2)}") + return [] + + res.encoding = "utf-8" + + # 检查响应状态 + if res.status_code == 403: + logger.error("被禁止访问 (403 Forbidden),可能是IP被限制") + return [] + + if res.status_code != 200: + logger.error(f"必应搜索请求失败,状态码: {res.status_code}") + return [] + + # 检查是否被重定向到登录页面或验证页面 + if "login.live.com" in res.url or "login.microsoftonline.com" in res.url: + logger.error("被重定向到登录页面,可能需要登录") + return [] + + if "https://www.bing.com/ck/a" in res.url: + logger.error("被重定向到验证页面,可能被识别为机器人") + return [] + + # 解析HTML + try: + root = BeautifulSoup(res.text, "lxml") + except Exception: + try: + root = BeautifulSoup(res.text, "html.parser") + except Exception as e: + logger.error(f"HTML解析失败: {str(e)}") + return [] + + list_data = [] + + # 尝试提取搜索结果 + # 方法1: 查找标准的搜索结果容器 + results = root.select("ol#b_results li.b_algo") + + if results: + for rank, result in enumerate(results, 1): + # 提取标题和链接 + title_link = result.select_one("h2 a") + if not title_link: + continue + + title = title_link.get_text().strip() + url = title_link.get("href", "") + + # 提取摘要 + abstract = "" + abstract_elem = result.select_one("div.b_caption p") + if abstract_elem: + abstract = abstract_elem.get_text().strip() + + # 限制摘要长度 + if ABSTRACT_MAX_LENGTH and len(abstract) > ABSTRACT_MAX_LENGTH: + abstract = abstract[:ABSTRACT_MAX_LENGTH] + "..." + + list_data.append({ + "title": title, + "url": url, + "snippet": abstract, + "provider": "Bing" + }) + + if len(list_data) >= 10: # 限制结果数量 + break + + # 方法2: 如果标准方法没找到结果,使用备用方法 + if not list_data: + # 查找所有可能的搜索结果链接 + all_links = root.find_all("a") + + for link in all_links: + href = link.get("href", "") + text = link.get_text().strip() + + # 过滤有效的搜索结果链接 + if (href and text and len(text) > 10 + and not href.startswith("javascript:") + and not href.startswith("#") + and "http" in href + and not any(x in href for x in [ + "bing.com/search", "bing.com/images", "bing.com/videos", + "bing.com/maps", "bing.com/news", "login", "account", + "microsoft", "javascript" + ])): + + # 尝试获取摘要 + abstract = "" + parent = link.parent + if parent and parent.get_text(): + full_text = parent.get_text().strip() + if len(full_text) > len(text): + abstract = full_text.replace(text, "", 1).strip() + + # 限制摘要长度 + if ABSTRACT_MAX_LENGTH and len(abstract) > ABSTRACT_MAX_LENGTH: + abstract = abstract[:ABSTRACT_MAX_LENGTH] + "..." + + list_data.append({ + "title": text, + "url": href, + "snippet": abstract, + "provider": "Bing" + }) + + if len(list_data) >= 10: + break + + logger.debug(f"从Bing解析到 {len(list_data)} 个搜索结果") + return list_data + + except Exception as e: + logger.error(f"解析Bing页面时出错: {str(e)}") + logger.debug(traceback.format_exc()) + return [] diff --git a/src/plugins/built_in/web_search_tool/engines/ddg_engine.py b/src/plugins/built_in/web_search_tool/engines/ddg_engine.py new file mode 100644 index 000000000..011935e27 --- /dev/null +++ b/src/plugins/built_in/web_search_tool/engines/ddg_engine.py @@ -0,0 +1,42 @@ +""" +DuckDuckGo search engine implementation +""" +from typing import Dict, List, Any +from asyncddgs import aDDGS + +from src.common.logger import get_logger +from .base import BaseSearchEngine + +logger = get_logger("ddg_engine") + + +class DDGSearchEngine(BaseSearchEngine): + """ + DuckDuckGo搜索引擎实现 + """ + + def is_available(self) -> bool: + """检查DuckDuckGo搜索引擎是否可用""" + return True # DuckDuckGo不需要API密钥,总是可用 + + async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: + """执行DuckDuckGo搜索""" + query = args["query"] + num_results = args.get("num_results", 3) + + try: + async with aDDGS() as ddgs: + search_response = await ddgs.text(query, max_results=num_results) + + return [ + { + "title": r.get("title"), + "url": r.get("href"), + "snippet": r.get("body"), + "provider": "DuckDuckGo" + } + for r in search_response + ] + except Exception as e: + logger.error(f"DuckDuckGo 搜索失败: {e}") + return [] diff --git a/src/plugins/built_in/web_search_tool/engines/exa_engine.py b/src/plugins/built_in/web_search_tool/engines/exa_engine.py new file mode 100644 index 000000000..2bb515e8e --- /dev/null +++ b/src/plugins/built_in/web_search_tool/engines/exa_engine.py @@ -0,0 +1,79 @@ +""" +Exa search engine implementation +""" +import asyncio +import functools +from datetime import datetime, timedelta +from typing import Dict, List, Any +from exa_py import Exa + +from src.common.logger import get_logger +from src.plugin_system.apis import config_api +from .base import BaseSearchEngine +from ..utils.api_key_manager import create_api_key_manager_from_config + +logger = get_logger("exa_engine") + + +class ExaSearchEngine(BaseSearchEngine): + """ + Exa搜索引擎实现 + """ + + def __init__(self): + self._initialize_clients() + + def _initialize_clients(self): + """初始化Exa客户端""" + # 从主配置文件读取API密钥 + exa_api_keys = config_api.get_global_config("exa.api_keys", None) + + # 创建API密钥管理器 + self.api_manager = create_api_key_manager_from_config( + exa_api_keys, + lambda key: Exa(api_key=key), + "Exa" + ) + + def is_available(self) -> bool: + """检查Exa搜索引擎是否可用""" + return self.api_manager.is_available() + + async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: + """执行Exa搜索""" + if not self.is_available(): + return [] + + query = args["query"] + num_results = args.get("num_results", 3) + time_range = args.get("time_range", "any") + + exa_args = {"num_results": num_results, "text": True, "highlights": True} + if time_range != "any": + today = datetime.now() + start_date = today - timedelta(days=7 if time_range == "week" else 30) + exa_args["start_published_date"] = start_date.strftime('%Y-%m-%d') + + try: + # 使用API密钥管理器获取下一个客户端 + exa_client = self.api_manager.get_next_client() + if not exa_client: + logger.error("无法获取Exa客户端") + return [] + + loop = asyncio.get_running_loop() + func = functools.partial(exa_client.search_and_contents, query, **exa_args) + search_response = await loop.run_in_executor(None, func) + + return [ + { + "title": res.title, + "url": res.url, + "snippet": " ".join(getattr(res, 'highlights', [])) or (getattr(res, 'text', '')[:250] + '...'), + "provider": "Exa" + } + for res in search_response.results + ] + except Exception as e: + logger.error(f"Exa 搜索失败: {e}") + return [] diff --git a/src/plugins/built_in/web_search_tool/engines/tavily_engine.py b/src/plugins/built_in/web_search_tool/engines/tavily_engine.py new file mode 100644 index 000000000..affb303fc --- /dev/null +++ b/src/plugins/built_in/web_search_tool/engines/tavily_engine.py @@ -0,0 +1,90 @@ +""" +Tavily search engine implementation +""" +import asyncio +import functools +from typing import Dict, List, Any +from tavily import TavilyClient + +from src.common.logger import get_logger +from src.plugin_system.apis import config_api +from .base import BaseSearchEngine +from ..utils.api_key_manager import create_api_key_manager_from_config + +logger = get_logger("tavily_engine") + + +class TavilySearchEngine(BaseSearchEngine): + """ + Tavily搜索引擎实现 + """ + + def __init__(self): + self._initialize_clients() + + def _initialize_clients(self): + """初始化Tavily客户端""" + # 从主配置文件读取API密钥 + tavily_api_keys = config_api.get_global_config("tavily.api_keys", None) + + # 创建API密钥管理器 + self.api_manager = create_api_key_manager_from_config( + tavily_api_keys, + lambda key: TavilyClient(api_key=key), + "Tavily" + ) + + def is_available(self) -> bool: + """检查Tavily搜索引擎是否可用""" + return self.api_manager.is_available() + + async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: + """执行Tavily搜索""" + if not self.is_available(): + return [] + + query = args["query"] + num_results = args.get("num_results", 3) + time_range = args.get("time_range", "any") + + try: + # 使用API密钥管理器获取下一个客户端 + tavily_client = self.api_manager.get_next_client() + if not tavily_client: + logger.error("无法获取Tavily客户端") + return [] + + # 构建Tavily搜索参数 + search_params = { + "query": query, + "max_results": num_results, + "search_depth": "basic", + "include_answer": False, + "include_raw_content": False + } + + # 根据时间范围调整搜索参数 + if time_range == "week": + search_params["days"] = 7 + elif time_range == "month": + search_params["days"] = 30 + + loop = asyncio.get_running_loop() + func = functools.partial(tavily_client.search, **search_params) + search_response = await loop.run_in_executor(None, func) + + results = [] + if search_response and "results" in search_response: + for res in search_response["results"]: + results.append({ + "title": res.get("title", "无标题"), + "url": res.get("url", ""), + "snippet": res.get("content", "")[:300] + "..." if res.get("content") else "无摘要", + "provider": "Tavily" + }) + + return results + + except Exception as e: + logger.error(f"Tavily 搜索失败: {e}") + return [] diff --git a/src/plugins/built_in/web_search_tool/plugin.py b/src/plugins/built_in/web_search_tool/plugin.py new file mode 100644 index 000000000..ce68576df --- /dev/null +++ b/src/plugins/built_in/web_search_tool/plugin.py @@ -0,0 +1,160 @@ +""" +Web Search Tool Plugin + +一个功能强大的网络搜索和URL解析插件,支持多种搜索引擎和解析策略。 +""" +from typing import List, Tuple, Type + +from src.plugin_system import ( + BasePlugin, + register_plugin, + ComponentInfo, + ConfigField, + PythonDependency +) +from src.plugin_system.apis import config_api +from src.common.logger import get_logger + +from .tools.web_search import WebSurfingTool +from .tools.url_parser import URLParserTool + +logger = get_logger("web_search_plugin") + + +@register_plugin +class WEBSEARCHPLUGIN(BasePlugin): + """ + 网络搜索工具插件 + + 提供网络搜索和URL解析功能,支持多种搜索引擎: + - Exa (需要API密钥) + - Tavily (需要API密钥) + - DuckDuckGo (免费) + - Bing (免费) + """ + + # 插件基本信息 + plugin_name: str = "web_search_tool" # 内部标识符 + enable_plugin: bool = True + dependencies: List[str] = [] # 插件依赖列表 + + def __init__(self, *args, **kwargs): + """初始化插件,立即加载所有搜索引擎""" + super().__init__(*args, **kwargs) + + # 立即初始化所有搜索引擎,触发API密钥管理器的日志输出 + logger.info("🚀 正在初始化所有搜索引擎...") + try: + from .engines.exa_engine import ExaSearchEngine + from .engines.tavily_engine import TavilySearchEngine + from .engines.ddg_engine import DDGSearchEngine + from .engines.bing_engine import BingSearchEngine + + # 实例化所有搜索引擎,这会触发API密钥管理器的初始化 + exa_engine = ExaSearchEngine() + tavily_engine = TavilySearchEngine() + ddg_engine = DDGSearchEngine() + bing_engine = BingSearchEngine() + + # 报告每个引擎的状态 + engines_status = { + "Exa": exa_engine.is_available(), + "Tavily": tavily_engine.is_available(), + "DuckDuckGo": ddg_engine.is_available(), + "Bing": bing_engine.is_available() + } + + available_engines = [name for name, available in engines_status.items() if available] + unavailable_engines = [name for name, available in engines_status.items() if not available] + + if available_engines: + logger.info(f"✅ 可用搜索引擎: {', '.join(available_engines)}") + if unavailable_engines: + logger.info(f"❌ 不可用搜索引擎: {', '.join(unavailable_engines)}") + + except Exception as e: + logger.error(f"❌ 搜索引擎初始化失败: {e}", exc_info=True) + + # Python包依赖列表 + python_dependencies: List[PythonDependency] = [ + PythonDependency( + package_name="asyncddgs", + description="异步DuckDuckGo搜索库", + optional=False + ), + PythonDependency( + package_name="exa_py", + description="Exa搜索API客户端库", + optional=True # 如果没有API密钥,这个是可选的 + ), + PythonDependency( + package_name="tavily", + install_name="tavily-python", # 安装时使用这个名称 + description="Tavily搜索API客户端库", + optional=True # 如果没有API密钥,这个是可选的 + ), + PythonDependency( + package_name="httpx", + version=">=0.20.0", + install_name="httpx[socks]", # 安装时使用这个名称(包含可选依赖) + description="支持SOCKS代理的HTTP客户端库", + optional=False + ) + ] + config_file_name: str = "config.toml" # 配置文件名 + + # 配置节描述 + config_section_descriptions = { + "plugin": "插件基本信息", + "proxy": "链接本地解析代理配置" + } + + # 配置Schema定义 + # 注意:EXA配置和组件设置已迁移到主配置文件(bot_config.toml)的[exa]和[web_search]部分 + config_schema: dict = { + "plugin": { + "name": ConfigField(type=str, default="WEB_SEARCH_PLUGIN", description="插件名称"), + "version": ConfigField(type=str, default="1.0.0", description="插件版本"), + "enabled": ConfigField(type=bool, default=False, description="是否启用插件"), + }, + "proxy": { + "http_proxy": ConfigField( + type=str, + default=None, + description="HTTP代理地址,格式如: http://proxy.example.com:8080" + ), + "https_proxy": ConfigField( + type=str, + default=None, + description="HTTPS代理地址,格式如: http://proxy.example.com:8080" + ), + "socks5_proxy": ConfigField( + type=str, + default=None, + description="SOCKS5代理地址,格式如: socks5://proxy.example.com:1080" + ), + "enable_proxy": ConfigField( + type=bool, + default=False, + description="是否启用代理" + ) + }, + } + + def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: + """ + 获取插件组件列表 + + Returns: + 组件信息和类型的元组列表 + """ + enable_tool = [] + + # 从主配置文件读取组件启用配置 + if True: + enable_tool.append((WebSurfingTool.get_tool_info(), WebSurfingTool)) + + if config_api.get_global_config("web_search.enable_url_tool", True): + enable_tool.append((URLParserTool.get_tool_info(), URLParserTool)) + + return enable_tool diff --git a/src/plugins/built_in/web_search_tool/tools/__init__.py b/src/plugins/built_in/web_search_tool/tools/__init__.py new file mode 100644 index 000000000..480099acd --- /dev/null +++ b/src/plugins/built_in/web_search_tool/tools/__init__.py @@ -0,0 +1,3 @@ +""" +Tools package +""" diff --git a/src/plugins/built_in/web_search_tool/tools/url_parser.py b/src/plugins/built_in/web_search_tool/tools/url_parser.py new file mode 100644 index 000000000..d5cb41fdb --- /dev/null +++ b/src/plugins/built_in/web_search_tool/tools/url_parser.py @@ -0,0 +1,242 @@ +""" +URL parser tool implementation +""" +import asyncio +import functools +from typing import Any, Dict, List +from exa_py import Exa +import httpx +from bs4 import BeautifulSoup + +from src.common.logger import get_logger +from src.plugin_system import BaseTool, ToolParamType, llm_api +from src.plugin_system.apis import config_api +from src.common.cache_manager import tool_cache + +from ..utils.formatters import format_url_parse_results +from ..utils.url_utils import parse_urls_from_input, validate_urls +from ..utils.api_key_manager import create_api_key_manager_from_config + +logger = get_logger("url_parser_tool") + + +class URLParserTool(BaseTool): + """ + 一个用于解析和总结一个或多个网页URL内容的工具。 + """ + name: str = "parse_url" + description: str = "当需要理解一个或多个特定网页链接的内容时,使用此工具。例如:'这些网页讲了什么?[https://example.com, https://example2.com]' 或 '帮我总结一下这些文章'" + available_for_llm: bool = True + parameters = [ + ("urls", ToolParamType.STRING, "要理解的网站", True, None), + ] + + def __init__(self, plugin_config=None): + super().__init__(plugin_config) + self._initialize_exa_clients() + + def _initialize_exa_clients(self): + """初始化Exa客户端""" + # 优先从主配置文件读取,如果没有则从插件配置文件读取 + exa_api_keys = config_api.get_global_config("exa.api_keys", None) + if exa_api_keys is None: + # 从插件配置文件读取 + exa_api_keys = self.get_config("exa.api_keys", []) + + # 创建API密钥管理器 + self.api_manager = create_api_key_manager_from_config( + exa_api_keys, + lambda key: Exa(api_key=key), + "Exa URL Parser" + ) + + async def _local_parse_and_summarize(self, url: str) -> Dict[str, Any]: + """ + 使用本地库(httpx, BeautifulSoup)解析URL,并调用LLM进行总结。 + """ + try: + # 读取代理配置 + enable_proxy = self.get_config("proxy.enable_proxy", False) + proxies = None + + if enable_proxy: + socks5_proxy = self.get_config("proxy.socks5_proxy", None) + http_proxy = self.get_config("proxy.http_proxy", None) + https_proxy = self.get_config("proxy.https_proxy", None) + + # 优先使用SOCKS5代理(全协议代理) + if socks5_proxy: + proxies = socks5_proxy + logger.info(f"使用SOCKS5代理: {socks5_proxy}") + elif http_proxy or https_proxy: + proxies = {} + if http_proxy: + proxies["http://"] = http_proxy + if https_proxy: + proxies["https://"] = https_proxy + logger.info(f"使用HTTP/HTTPS代理配置: {proxies}") + + client_kwargs = {"timeout": 15.0, "follow_redirects": True} + if proxies: + client_kwargs["proxies"] = proxies + + async with httpx.AsyncClient(**client_kwargs) as client: + response = await client.get(url) + response.raise_for_status() + + soup = BeautifulSoup(response.text, "html.parser") + + title = soup.title.string if soup.title else "无标题" + for script in soup(["script", "style"]): + script.extract() + text = soup.get_text(separator="\n", strip=True) + + if not text: + return {"error": "无法从页面提取有效文本内容。"} + + summary_prompt = f"请根据以下网页内容,生成一段不超过300字的中文摘要,保留核心信息和关键点:\n\n---\n\n标题: {title}\n\n内容:\n{text[:4000]}\n\n---\n\n摘要:" + + text_model = str(self.get_config("models.text_model", "replyer_1")) + models = llm_api.get_available_models() + model_config = models.get(text_model) + if not model_config: + logger.error("未配置LLM模型") + return {"error": "未配置LLM模型"} + + success, summary, reasoning, model_name = await llm_api.generate_with_model( + prompt=summary_prompt, + model_config=model_config, + request_type="story.generate", + temperature=0.3, + max_tokens=1000 + ) + + if not success: + logger.info(f"生成摘要失败: {summary}") + return {"error": "发生ai错误"} + + logger.info(f"成功生成摘要内容:'{summary}'") + + return { + "title": title, + "url": url, + "snippet": summary, + "source": "local" + } + + except httpx.HTTPStatusError as e: + logger.warning(f"本地解析URL '{url}' 失败 (HTTP {e.response.status_code})") + return {"error": f"请求失败,状态码: {e.response.status_code}"} + except Exception as e: + logger.error(f"本地解析或总结URL '{url}' 时发生未知异常: {e}", exc_info=True) + return {"error": f"发生未知错误: {str(e)}"} + + async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]: + """ + 执行URL内容提取和总结。优先使用Exa,失败后尝试本地解析。 + """ + # 获取当前文件路径用于缓存键 + import os + current_file_path = os.path.abspath(__file__) + + # 检查缓存 + cached_result = await tool_cache.get(self.name, function_args, current_file_path) + if cached_result: + logger.info(f"缓存命中: {self.name} -> {function_args}") + return cached_result + + urls_input = function_args.get("urls") + if not urls_input: + return {"error": "URL列表不能为空。"} + + # 处理URL输入,确保是列表格式 + urls = parse_urls_from_input(urls_input) + if not urls: + return {"error": "提供的字符串中未找到有效的URL。"} + + # 验证URL格式 + valid_urls = validate_urls(urls) + if not valid_urls: + return {"error": "未找到有效的URL。"} + + urls = valid_urls + logger.info(f"准备解析 {len(urls)} 个URL: {urls}") + + successful_results = [] + error_messages = [] + urls_to_retry_locally = [] + + # 步骤 1: 尝试使用 Exa API 进行解析 + contents_response = None + if self.api_manager.is_available(): + logger.info(f"开始使用 Exa API 解析URL: {urls}") + try: + # 使用API密钥管理器获取下一个客户端 + exa_client = self.api_manager.get_next_client() + if not exa_client: + logger.error("无法获取Exa客户端") + else: + loop = asyncio.get_running_loop() + exa_params = {"text": True, "summary": True, "highlights": True} + func = functools.partial(exa_client.get_contents, urls, **exa_params) + contents_response = await loop.run_in_executor(None, func) + except Exception as e: + logger.error(f"执行 Exa URL解析时发生严重异常: {e}", exc_info=True) + contents_response = None # 确保异常后为None + + # 步骤 2: 处理Exa的响应 + if contents_response and hasattr(contents_response, 'statuses'): + results_map = {res.url: res for res in contents_response.results} if hasattr(contents_response, 'results') else {} + if contents_response.statuses: + for status in contents_response.statuses: + if status.status == 'success': + res = results_map.get(status.id) + if res: + summary = getattr(res, 'summary', '') + highlights = " ".join(getattr(res, 'highlights', [])) + text_snippet = (getattr(res, 'text', '')[:300] + '...') if getattr(res, 'text', '') else '' + snippet = summary or highlights or text_snippet or '无摘要' + + successful_results.append({ + "title": getattr(res, 'title', '无标题'), + "url": getattr(res, 'url', status.id), + "snippet": snippet, + "source": "exa" + }) + else: + error_tag = getattr(status, 'error', '未知错误') + logger.warning(f"Exa解析URL '{status.id}' 失败: {error_tag}。准备本地重试。") + urls_to_retry_locally.append(status.id) + else: + # 如果Exa未配置、API调用失败或返回无效响应,则所有URL都进入本地重试 + urls_to_retry_locally.extend(url for url in urls if url not in [res['url'] for res in successful_results]) + + # 步骤 3: 对失败的URL进行本地解析 + if urls_to_retry_locally: + logger.info(f"开始本地解析以下URL: {urls_to_retry_locally}") + local_tasks = [self._local_parse_and_summarize(url) for url in urls_to_retry_locally] + local_results = await asyncio.gather(*local_tasks) + + for i, res in enumerate(local_results): + url = urls_to_retry_locally[i] + if "error" in res: + error_messages.append(f"URL: {url} - 解析失败: {res['error']}") + else: + successful_results.append(res) + + if not successful_results: + return {"error": "无法从所有给定的URL获取内容。", "details": error_messages} + + formatted_content = format_url_parse_results(successful_results) + + result = { + "type": "url_parse_result", + "content": formatted_content, + "errors": error_messages + } + + # 保存到缓存 + if "error" not in result: + await tool_cache.set(self.name, function_args, current_file_path, result) + + return result diff --git a/src/plugins/built_in/web_search_tool/tools/web_search.py b/src/plugins/built_in/web_search_tool/tools/web_search.py new file mode 100644 index 000000000..56f70d355 --- /dev/null +++ b/src/plugins/built_in/web_search_tool/tools/web_search.py @@ -0,0 +1,164 @@ +""" +Web search tool implementation +""" +import asyncio +from typing import Any, Dict, List + +from src.common.logger import get_logger +from src.plugin_system import BaseTool, ToolParamType, llm_api +from src.plugin_system.apis import config_api +from src.common.cache_manager import tool_cache + +from ..engines.exa_engine import ExaSearchEngine +from ..engines.tavily_engine import TavilySearchEngine +from ..engines.ddg_engine import DDGSearchEngine +from ..engines.bing_engine import BingSearchEngine +from ..utils.formatters import format_search_results, deduplicate_results + +logger = get_logger("web_search_tool") + + +class WebSurfingTool(BaseTool): + """ + 网络搜索工具 + """ + name: str = "web_search" + description: str = "用于执行网络搜索。当用户明确要求搜索,或者需要获取关于公司、产品、事件的最新信息、新闻或动态时,必须使用此工具" + available_for_llm: bool = True + parameters = [ + ("query", ToolParamType.STRING, "要搜索的关键词或问题。", True, None), + ("num_results", ToolParamType.INTEGER, "期望每个搜索引擎返回的搜索结果数量,默认为5。", False, None), + ("time_range", ToolParamType.STRING, "指定搜索的时间范围,可以是 'any', 'week', 'month'。默认为 'any'。", False, ["any", "week", "month"]) + ] # type: ignore + + def __init__(self, plugin_config=None): + super().__init__(plugin_config) + # 初始化搜索引擎 + self.engines = { + "exa": ExaSearchEngine(), + "tavily": TavilySearchEngine(), + "ddg": DDGSearchEngine(), + "bing": BingSearchEngine() + } + + async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]: + query = function_args.get("query") + if not query: + return {"error": "搜索查询不能为空。"} + + # 获取当前文件路径用于缓存键 + import os + current_file_path = os.path.abspath(__file__) + + # 检查缓存 + cached_result = await tool_cache.get(self.name, function_args, current_file_path, semantic_query=query) + if cached_result: + logger.info(f"缓存命中: {self.name} -> {function_args}") + return cached_result + + # 读取搜索配置 + enabled_engines = config_api.get_global_config("web_search.enabled_engines", ["ddg"]) + search_strategy = config_api.get_global_config("web_search.search_strategy", "single") + + logger.info(f"开始搜索,策略: {search_strategy}, 启用引擎: {enabled_engines}, 参数: '{function_args}'") + + # 根据策略执行搜索 + if search_strategy == "parallel": + result = await self._execute_parallel_search(function_args, enabled_engines) + elif search_strategy == "fallback": + result = await self._execute_fallback_search(function_args, enabled_engines) + else: # single + result = await self._execute_single_search(function_args, enabled_engines) + + # 保存到缓存 + if "error" not in result: + await tool_cache.set(self.name, function_args, current_file_path, result, semantic_query=query) + + return result + + async def _execute_parallel_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]: + """并行搜索策略:同时使用所有启用的搜索引擎""" + search_tasks = [] + + for engine_name in enabled_engines: + engine = self.engines.get(engine_name) + if engine and engine.is_available(): + custom_args = function_args.copy() + custom_args["num_results"] = custom_args.get("num_results", 5) + search_tasks.append(engine.search(custom_args)) + + if not search_tasks: + return {"error": "没有可用的搜索引擎。"} + + try: + search_results_lists = await asyncio.gather(*search_tasks, return_exceptions=True) + + all_results = [] + for result in search_results_lists: + if isinstance(result, list): + all_results.extend(result) + elif isinstance(result, Exception): + logger.error(f"搜索时发生错误: {result}") + + # 去重并格式化 + unique_results = deduplicate_results(all_results) + formatted_content = format_search_results(unique_results) + + return { + "type": "web_search_result", + "content": formatted_content, + } + + except Exception as e: + logger.error(f"执行并行网络搜索时发生异常: {e}", exc_info=True) + return {"error": f"执行网络搜索时发生严重错误: {str(e)}"} + + async def _execute_fallback_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]: + """回退搜索策略:按顺序尝试搜索引擎,失败则尝试下一个""" + for engine_name in enabled_engines: + engine = self.engines.get(engine_name) + if not engine or not engine.is_available(): + continue + + try: + custom_args = function_args.copy() + custom_args["num_results"] = custom_args.get("num_results", 5) + + results = await engine.search(custom_args) + + if results: # 如果有结果,直接返回 + formatted_content = format_search_results(results) + return { + "type": "web_search_result", + "content": formatted_content, + } + + except Exception as e: + logger.warning(f"{engine_name} 搜索失败,尝试下一个引擎: {e}") + continue + + return {"error": "所有搜索引擎都失败了。"} + + async def _execute_single_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]: + """单一搜索策略:只使用第一个可用的搜索引擎""" + for engine_name in enabled_engines: + engine = self.engines.get(engine_name) + if not engine or not engine.is_available(): + continue + + try: + custom_args = function_args.copy() + custom_args["num_results"] = custom_args.get("num_results", 5) + + results = await engine.search(custom_args) + formatted_content = format_search_results(results) + return { + "type": "web_search_result", + "content": formatted_content, + } + + except Exception as e: + logger.error(f"{engine_name} 搜索失败: {e}") + return {"error": f"{engine_name} 搜索失败: {str(e)}"} + + return {"error": "没有可用的搜索引擎。"} diff --git a/src/plugins/built_in/web_search_tool/utils/__init__.py b/src/plugins/built_in/web_search_tool/utils/__init__.py new file mode 100644 index 000000000..8ebe2c35d --- /dev/null +++ b/src/plugins/built_in/web_search_tool/utils/__init__.py @@ -0,0 +1,3 @@ +""" +Web search tool utilities package +""" diff --git a/src/plugins/built_in/web_search_tool/utils/api_key_manager.py b/src/plugins/built_in/web_search_tool/utils/api_key_manager.py new file mode 100644 index 000000000..f8e0afa71 --- /dev/null +++ b/src/plugins/built_in/web_search_tool/utils/api_key_manager.py @@ -0,0 +1,84 @@ +""" +API密钥管理器,提供轮询机制 +""" +import itertools +from typing import List, Optional, TypeVar, Generic, Callable +from src.common.logger import get_logger + +logger = get_logger("api_key_manager") + +T = TypeVar('T') + + +class APIKeyManager(Generic[T]): + """ + API密钥管理器,支持轮询机制 + """ + + def __init__(self, api_keys: List[str], client_factory: Callable[[str], T], service_name: str = "Unknown"): + """ + 初始化API密钥管理器 + + Args: + api_keys: API密钥列表 + client_factory: 客户端工厂函数,接受API密钥参数并返回客户端实例 + service_name: 服务名称,用于日志记录 + """ + self.service_name = service_name + self.clients: List[T] = [] + self.client_cycle: Optional[itertools.cycle] = None + + if api_keys: + # 过滤有效的API密钥,排除None、空字符串、"None"字符串等 + valid_keys = [] + for key in api_keys: + if isinstance(key, str) and key.strip() and key.strip().lower() not in ("none", "null", ""): + valid_keys.append(key.strip()) + + if valid_keys: + try: + self.clients = [client_factory(key) for key in valid_keys] + self.client_cycle = itertools.cycle(self.clients) + logger.info(f"🔑 {service_name} 成功加载 {len(valid_keys)} 个 API 密钥") + except Exception as e: + logger.error(f"❌ 初始化 {service_name} 客户端失败: {e}") + self.clients = [] + self.client_cycle = None + else: + logger.warning(f"⚠️ {service_name} API Keys 配置无效(包含None或空值),{service_name} 功能将不可用") + else: + logger.warning(f"⚠️ {service_name} API Keys 未配置,{service_name} 功能将不可用") + + def is_available(self) -> bool: + """检查是否有可用的客户端""" + return bool(self.clients and self.client_cycle) + + def get_next_client(self) -> Optional[T]: + """获取下一个客户端(轮询)""" + if not self.is_available(): + return None + return next(self.client_cycle) + + def get_client_count(self) -> int: + """获取可用客户端数量""" + return len(self.clients) + + +def create_api_key_manager_from_config( + config_keys: Optional[List[str]], + client_factory: Callable[[str], T], + service_name: str +) -> APIKeyManager[T]: + """ + 从配置创建API密钥管理器的便捷函数 + + Args: + config_keys: 从配置读取的API密钥列表 + client_factory: 客户端工厂函数 + service_name: 服务名称 + + Returns: + API密钥管理器实例 + """ + api_keys = config_keys if isinstance(config_keys, list) else [] + return APIKeyManager(api_keys, client_factory, service_name) diff --git a/src/plugins/built_in/web_search_tool/utils/formatters.py b/src/plugins/built_in/web_search_tool/utils/formatters.py new file mode 100644 index 000000000..434f6f3c8 --- /dev/null +++ b/src/plugins/built_in/web_search_tool/utils/formatters.py @@ -0,0 +1,57 @@ +""" +Formatters for web search results +""" +from typing import List, Dict, Any + + +def format_search_results(results: List[Dict[str, Any]]) -> str: + """ + 格式化搜索结果为字符串 + """ + if not results: + return "没有找到相关的网络信息。" + + formatted_string = "根据网络搜索结果:\n\n" + for i, res in enumerate(results, 1): + title = res.get("title", '无标题') + url = res.get("url", '#') + snippet = res.get("snippet", '无摘要') + provider = res.get("provider", "未知来源") + + formatted_string += f"{i}. **{title}** (来自: {provider})\n" + formatted_string += f" - 摘要: {snippet}\n" + formatted_string += f" - 来源: {url}\n\n" + + return formatted_string + + +def format_url_parse_results(results: List[Dict[str, Any]]) -> str: + """ + 将成功解析的URL结果列表格式化为一段简洁的文本。 + """ + formatted_parts = [] + for res in results: + title = res.get('title', '无标题') + url = res.get('url', '#') + snippet = res.get('snippet', '无摘要') + source = res.get('source', '未知') + + formatted_string = f"**{title}**\n" + formatted_string += f"**内容摘要**:\n{snippet}\n" + formatted_string += f"**来源**: {url} (由 {source} 解析)\n" + formatted_parts.append(formatted_string) + + return "\n---\n".join(formatted_parts) + + +def deduplicate_results(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + 根据URL去重搜索结果 + """ + unique_urls = set() + unique_results = [] + for res in results: + if isinstance(res, dict) and res.get("url") and res["url"] not in unique_urls: + unique_urls.add(res["url"]) + unique_results.append(res) + return unique_results diff --git a/src/plugins/built_in/web_search_tool/utils/url_utils.py b/src/plugins/built_in/web_search_tool/utils/url_utils.py new file mode 100644 index 000000000..74afbc819 --- /dev/null +++ b/src/plugins/built_in/web_search_tool/utils/url_utils.py @@ -0,0 +1,39 @@ +""" +URL processing utilities +""" +import re +from typing import List + + +def parse_urls_from_input(urls_input) -> List[str]: + """ + 从输入中解析URL列表 + """ + if isinstance(urls_input, str): + # 如果是字符串,尝试解析为URL列表 + # 提取所有HTTP/HTTPS URL + url_pattern = r'https?://[^\s\],]+' + urls = re.findall(url_pattern, urls_input) + if not urls: + # 如果没有找到标准URL,将整个字符串作为单个URL + if urls_input.strip().startswith(('http://', 'https://')): + urls = [urls_input.strip()] + else: + return [] + elif isinstance(urls_input, list): + urls = [url.strip() for url in urls_input if isinstance(url, str) and url.strip()] + else: + return [] + + return urls + + +def validate_urls(urls: List[str]) -> List[str]: + """ + 验证URL格式,返回有效的URL列表 + """ + valid_urls = [] + for url in urls: + if url.startswith(('http://', 'https://')): + valid_urls.append(url) + return valid_urls