This commit is contained in:
minecraft1024a
2025-08-22 12:17:16 +08:00
28 changed files with 1368 additions and 1113 deletions

View File

@@ -18,10 +18,6 @@ logger = get_logger("anti_injector.counter_attack")
class CounterAttackGenerator:
"""反击消息生成器"""
def __init__(self):
"""初始化反击消息生成器"""
pass
def get_personality_context(self) -> str:
"""获取人格上下文信息

View File

@@ -18,9 +18,6 @@ logger = get_logger("anti_injector.counter_attack")
class CounterAttackGenerator:
"""反击消息生成器"""
def __init__(self):
"""初始化反击消息生成器"""
pass
def get_personality_context(self) -> str:
"""获取人格上下文信息

View File

@@ -17,10 +17,6 @@ logger = get_logger("anti_injector.message_processor")
class MessageProcessor:
"""消息内容处理器"""
def __init__(self):
"""初始化消息处理器"""
pass
def extract_text_content(self, message: MessageRecv) -> str:
"""提取消息中的文本内容,过滤掉引用的历史内容

View File

@@ -144,8 +144,7 @@ class EmbeddingStore:
# 确保事件循环被正确关闭
try:
loop.close()
except Exception:
pass
except Exception: ...
def _get_embeddings_batch_threaded(self, strs: List[str], chunk_size: int = 10, max_workers: int = 10, progress_callback=None) -> List[Tuple[str, List[float]]]:
"""使用多线程批量获取嵌入向量

View File

@@ -58,8 +58,7 @@ def fix_broken_generated_json(json_str: str) -> str:
# Try to load the JSON to see if it is valid
json.loads(json_str)
return json_str # Return as-is if valid
except json.JSONDecodeError:
pass
except json.JSONDecodeError: ...
# Step 1: Remove trailing content after the last comma.
last_comma_index = json_str.rfind(",")

View File

@@ -25,14 +25,6 @@ class MemoryItem:
self.keywords: list[str] = keywords
self.create_time: float = time.time()
self.last_view_time: float = time.time()
class MemoryManager:
def __init__(self):
# self.memory_items:list[MemoryItem] = []
pass
class InstantMemory:
def __init__(self, chat_id):
self.chat_id = chat_id
@@ -219,14 +211,12 @@ class InstantMemory:
try:
dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
return dt, dt + timedelta(hours=1)
except Exception:
pass
except Exception: ...
# 具体日期
try:
dt = datetime.strptime(time_str, "%Y-%m-%d")
return dt, dt + timedelta(days=1)
except Exception:
pass
except Exception: ...
# 相对时间
if time_str == "今天":
start = now.replace(hour=0, minute=0, second=0, microsecond=0)

View File

@@ -170,7 +170,7 @@ class ChatBot:
logger.error(f"处理命令时出错: {e}")
return False, None, True # 出错时继续处理消息
async def hanle_notice_message(self, message: MessageRecv):
async def handle_notice_message(self, message: MessageRecv):
if message.message_info.message_id == "notice":
message.is_notify = True
logger.info("notice消息")
@@ -265,9 +265,7 @@ class ChatBot:
# logger.debug(str(message_data))
message = MessageRecv(message_data)
if await self.hanle_notice_message(message):
# return
pass
if await self.handle_notice_message(message): ...
group_info = message.message_info.group_info
user_info = message.message_info.user_info

View File

@@ -791,9 +791,8 @@ def build_pic_mapping_info(pic_id_mapping: Dict[str, str]) -> str:
image = session.execute(select(Images).where(Images.image_id == pic_id)).scalar()
if image and image.description:
description = image.description
except Exception:
except Exception: ...
# 如果查询失败,保持默认描述
pass
mapping_lines.append(f"[{display_name}] 的内容:{description}")

View File

@@ -70,8 +70,8 @@ class PromptContext:
# 如果reset失败尝试直接设置
try:
self._current_context = previous_context
except Exception:
pass # 静默忽略恢复失败
except Exception: ...
# 静默忽略恢复失败
async def get_prompt_async(self, name: str) -> Optional["Prompt"]:
"""异步获取当前作用域中的提示模板"""

View File

@@ -381,8 +381,7 @@ class ImageManager:
# 确保是RGB格式方便比较
frame = gif.convert("RGB")
all_frames.append(frame.copy())
except EOFError:
pass # 读完啦
except EOFError: ... # 读完啦
if not all_frames:
logger.warning("GIF中没有找到任何帧")

View File

@@ -44,7 +44,7 @@ class ClassicalWillingManager(BaseWillingManager):
return reply_probability
async def before_generate_reply_handle(self, message_id):
async def before_generate_reply_handle(self, message_id):
pass
async def after_generate_reply_handle(self, message_id):

View File

@@ -68,7 +68,7 @@ TEMPLATE_DIR = os.path.join(PROJECT_ROOT, "template")
# 考虑到实际上配置文件中的mai_version是不会自动更新的,所以采用硬编码
# 对该字段的更新请严格参照语义化版本规范https://semver.org/lang/zh-CN/
MMC_VERSION = "0.10.0-snapshot.5"
MMC_VERSION = "0.10.0-alpha-1"
def get_key_comment(toml_table, key):

View File

@@ -1,436 +0,0 @@
from src.common.logger import get_logger
from bs4 import BeautifulSoup
import requests
import random
import os
import traceback
logger = get_logger("web_surfing_tool")
ABSTRACT_MAX_LENGTH = 300 # abstract max length
user_agents = [
# Edge浏览器
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
# Chrome浏览器
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
# Firefox浏览器
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:123.0) Gecko/20100101 Firefox/123.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0",
# Safari浏览器
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Safari/605.1.15",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15",
# 移动端浏览器
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (iPad; CPU OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (Linux; Android 14; SM-S918B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36",
# 搜索引擎爬虫 (模拟)
"Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)",
"Mozilla/5.0 (compatible; Bingbot/2.0; +http://www.bing.com/bingbot.htm)",
"Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)",
]
# 请求头信息
HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Cache-Control": "max-age=0",
"Connection": "keep-alive",
"Host": "www.bing.com",
"Referer": "https://www.bing.com/",
"Sec-Ch-Ua": '"Chromium";v="122", "Microsoft Edge";v="122", "Not-A.Brand";v="99"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
}
# 替代的中国区必应请求头
CN_BING_HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Cache-Control": "max-age=0",
"Connection": "keep-alive",
"Host": "cn.bing.com",
"Referer": "https://cn.bing.com/",
"Sec-Ch-Ua": '"Chromium";v="122", "Microsoft Edge";v="122", "Not-A.Brand";v="99"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
}
bing_host_url = "https://www.bing.com"
bing_search_url = "https://www.bing.com/search?q="
cn_bing_host_url = "https://cn.bing.com"
cn_bing_search_url = "https://cn.bing.com/search?q="
class BingSearch:
session = requests.Session()
session.headers = HEADERS
def search(self, keyword, num_results=10):
"""
通过关键字进行搜索
:param keyword: 关键字
:param num_results 指定返回的结果个数
:return: 结果列表
"""
if not keyword:
return None
list_result = []
page = 1
# 起始搜索的url
next_url = bing_search_url + keyword
# 循环遍历每一页的搜索结果并返回下一页的url
while len(list_result) < num_results:
data, next_url = self.parse_html(next_url, rank_start=len(list_result))
if data:
list_result += data
logger.debug(
"---searching[{}], finish parsing page {}, results number={}: ".format(keyword, page, len(data))
)
for d in data:
logger.debug(str(d))
if not next_url:
logger.debug("already search the last page。")
break
page += 1
logger.debug("\n---search [{}] finished. total results number={}".format(keyword, len(list_result)))
return list_result[:num_results] if len(list_result) > num_results else list_result
def parse_html(self, url, rank_start=0, debug=0):
"""
解析处理结果
:param url: 需要抓取的 url
:return: 结果列表下一页的url
"""
try:
logger.debug("--search_bing-------url: {}".format(url))
# 确定是国际版还是中国版必应
is_cn_bing = "cn.bing.com" in url
# 保存当前URL以便调试
query_part = url.split("?q=")[1] if "?q=" in url else "unknown_query"
debug_filename = f"debug/bing_{'cn' if is_cn_bing else 'www'}_search_{query_part[:30]}.html"
# 设置必要的Cookie
cookies = {
"SRCHHPGUSR": "SRCHLANG=zh-Hans", # 设置默认搜索语言为中文
"SRCHD": "AF=NOFORM",
"SRCHUID": "V=2&GUID=1A4D4F1C8844493F9A2E3DB0D1BC806C",
"_SS": "SID=0D89D9A3C95C60B62E7AC80CC85461B3",
"_EDGE_S": "ui=zh-cn", # 设置界面语言为中文
"_EDGE_V": "1",
}
# 使用适当的请求头
# 为每次请求随机选择不同的用户代理,降低被屏蔽风险
headers = CN_BING_HEADERS.copy() if is_cn_bing else HEADERS.copy()
headers["User-Agent"] = random.choice(user_agents)
# 为不同域名使用不同的Session避免Cookie污染
session = requests.Session()
session.headers.update(headers)
session.cookies.update(cookies)
# 添加超时和重试,降低超时时间并允许重试
try:
res = session.get(
url=url, timeout=(3.05, 6), verify=True, allow_redirects=True
) # 超时分别为连接超时和读取超时
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
# 如果第一次尝试超时,使用更宽松的设置再试一次
logger.warning(f"第一次请求超时,正在重试: {str(e)}")
try:
# 第二次尝试使用更长的超时时间
res = session.get(url=url, timeout=(5, 10), verify=False) # 忽略SSL验证
except Exception as e2:
logger.error(f"第二次请求也失败: {str(e2)}")
# 如果所有尝试都失败,返回空结果
return [], None
res.encoding = "utf-8"
# 保存响应内容以便调试
os.makedirs("debug", exist_ok=True)
with open(debug_filename, "w", encoding="utf-8") as f:
f.write(res.text)
# 检查响应状态
logger.debug(f"--search_bing-------status_code: {res.status_code}")
if res.status_code == 403:
logger.error("被禁止访问 (403 Forbidden)可能是IP被限制")
# 如果被禁止,返回空结果
return [], None
if res.status_code != 200:
logger.error(f"必应搜索请求失败,状态码: {res.status_code}")
return None, None
# 检查是否被重定向到登录页面或验证页面
if "login.live.com" in res.url or "login.microsoftonline.com" in res.url:
logger.error("被重定向到登录页面,可能需要登录")
return None, None
if "https://www.bing.com/ck/a" in res.url:
logger.error("被重定向到验证页面,可能被识别为机器人")
return None, None
# 解析HTML - 添加对多种解析器的支持
try:
# 首先尝试使用lxml解析器
root = BeautifulSoup(res.text, "lxml")
except Exception as e:
logger.warning(f"lxml解析器不可用: {str(e)}尝试使用html.parser")
try:
# 如果lxml不可用使用内置解析器
root = BeautifulSoup(res.text, "html.parser")
except Exception as e2:
logger.error(f"HTML解析失败: {str(e2)}")
return None, None
# 保存解析结果的一小部分用于调试
sample_html = str(root)[:1000] if root else ""
logger.debug(f"HTML解析结果示例: {sample_html}")
list_data = []
# 确保我们能获取到内容 - 先尝试直接提取链接
all_links = root.find_all("a")
# 记录链接总数,帮助诊断
logger.debug(f"页面中总共找到了 {len(all_links)} 个链接")
# 保存一些链接示例到日志
sample_links = []
for i, link in enumerate(all_links):
if i < 10: # 只记录前10个链接
sample_links.append({"text": link.text.strip(), "href": link.get("href", "")})
logger.debug(f"链接示例: {sample_links}")
# 方法0查找动态提取的结果
# 尝试查找包含完整结果项的父容器
result_containers = []
# 一些可能的结果容器选择器
container_selectors = [
"ol#b_results",
"div.b_searchResults",
"div#b_content",
"div.srchrslt_main",
"div.mspg_cont",
"div.ms-srchResult-results",
"div#ContentAll",
"div.resultlist",
]
for selector in container_selectors:
containers = root.select(selector)
if containers:
logger.debug(f"找到可能的结果容器: {selector}, 数量: {len(containers)}")
result_containers.extend(containers)
# 如果找到容器,尝试在容器中寻找有价值的链接
extracted_items = []
if result_containers:
for container in result_containers:
# 查找标题元素h1, h2, h3, h4
for heading in container.find_all(["h1", "h2", "h3", "h4", "strong", "b"]):
# 如果标题元素包含链接,这很可能是搜索结果的标题
link = heading.find("a")
if link and link.get("href") and link.text.strip():
url = link.get("href")
title = link.text.strip()
# 如果是有效的外部链接
if (
not url.startswith("javascript:")
and not url.startswith("#")
and not any(x in url for x in ["bing.com/search", "bing.com/images"])
):
# 查找摘要:尝试找到相邻的段落元素
abstract = ""
# 尝试在标题后面查找摘要
next_elem = heading.next_sibling
while next_elem and not abstract:
if hasattr(next_elem, "name") and next_elem.name in ["p", "div", "span"]:
abstract = next_elem.text.strip()
break
next_elem = next_elem.next_sibling
# 如果没找到,尝试在父元素内查找其他段落
if not abstract:
parent = heading.parent
for p in parent.find_all(
["p", "div"],
class_=lambda c: c
and any(
x in str(c) for x in ["desc", "abstract", "snippet", "caption", "summary"]
),
):
if p != heading:
abstract = p.text.strip()
break
# 创建结果项
extracted_items.append(
{
"title": title,
"url": url,
"abstract": abstract,
}
)
logger.debug(f"提取到搜索结果: {title}")
# 如果找到了结果,添加到列表
if extracted_items:
for rank, item in enumerate(extracted_items, start=rank_start + 1):
# 裁剪摘要长度
abstract = item["abstract"]
if ABSTRACT_MAX_LENGTH and len(abstract) > ABSTRACT_MAX_LENGTH:
abstract = abstract[:ABSTRACT_MAX_LENGTH]
list_data.append({"title": item["title"], "abstract": abstract, "url": item["url"], "rank": rank})
logger.debug(f"从容器中提取了 {len(list_data)} 个搜索结果")
if list_data:
return list_data, None
# 如果上面的方法没有找到结果,尝试通用链接提取
valid_links = []
for link in all_links:
href = link.get("href", "")
text = link.text.strip()
# 有效的搜索结果链接通常有这些特点
if (
href
and text
and len(text) > 10 # 标题通常比较长
and not href.startswith("javascript:")
and not href.startswith("#")
and not any(
x in href
for x in [
"bing.com/search",
"bing.com/images",
"bing.com/videos",
"bing.com/maps",
"bing.com/news",
"login",
"account",
"javascript",
"about.html",
"help.html",
"microsoft",
]
)
and "http" in href
): # 必须是有效URL
valid_links.append(link)
# 按文本长度排序,更长的文本更可能是搜索结果标题
valid_links.sort(key=lambda x: len(x.text.strip()), reverse=True)
if valid_links:
logger.debug(f"找到 {len(valid_links)} 个可能的搜索结果链接")
# 提取前10个作为搜索结果
for rank, link in enumerate(valid_links[:10], start=rank_start + 1):
href = link.get("href", "")
text = link.text.strip()
# 获取摘要
abstract = ""
# 尝试获取父元素的文本作为摘要
parent = link.parent
if parent and parent.text:
full_text = parent.text.strip()
if len(full_text) > len(text):
abstract = full_text.replace(text, "", 1).strip()
# 如果没有找到好的摘要,尝试查找相邻元素
if len(abstract) < 20:
next_elem = link.next_sibling
while next_elem and len(abstract) < 20:
if hasattr(next_elem, "text") and next_elem.text.strip():
abstract = next_elem.text.strip()
break
next_elem = next_elem.next_sibling
# 裁剪摘要长度
if ABSTRACT_MAX_LENGTH and len(abstract) > ABSTRACT_MAX_LENGTH:
abstract = abstract[:ABSTRACT_MAX_LENGTH]
list_data.append({"title": text, "abstract": abstract, "url": href, "rank": rank})
logger.debug(f"提取到备选搜索结果 #{rank}: {text}")
# 如果找到了结果,返回
if list_data:
logger.debug(f"通过备选方法提取了 {len(list_data)} 个搜索结果")
return list_data, None
# 检查是否有错误消息
error_msg = root.find("div", class_="b_searcherrmsg")
if error_msg:
logger.error(f"必应搜索返回错误: {error_msg.text.strip()}")
# 找到下一页按钮 (尝试多种可能的选择器)
next_url = None
# 方式1: 标准下一页按钮
pagination_classes = ["b_widePag sb_bp", "b_pag"]
for cls in pagination_classes:
next_page = root.find("a", class_=cls)
if next_page and any(txt in next_page.text for txt in ["下一页", "Next", "下页"]):
next_url = next_page.get("href", "")
if next_url and not next_url.startswith("http"):
next_url = (cn_bing_host_url if is_cn_bing else bing_host_url) + next_url
break
# 方式2: 备用下一页按钮
if not next_url:
pagination = root.find_all("a", class_="sb_pagN")
if pagination:
next_url = pagination[0].get("href", "")
if next_url and not next_url.startswith("http"):
next_url = (cn_bing_host_url if is_cn_bing else bing_host_url) + next_url
# 方式3: 通用导航元素
if not next_url:
nav_links = root.find_all("a")
for link in nav_links:
if link.text.strip() in ["下一页", "Next", "下页", "»", ">>"]:
next_url = link.get("href", "")
if next_url and not next_url.startswith("http"):
next_url = (cn_bing_host_url if is_cn_bing else bing_host_url) + next_url
break
logger.debug(f"已解析 {len(list_data)} 个结果,下一页链接: {next_url}")
return list_data, next_url
except Exception as e:
logger.error(f"解析页面时出错: {str(e)}")
logger.debug(traceback.format_exc())
return None, None

View File

@@ -1,654 +1,81 @@
import asyncio
import functools
import itertools
from typing import Any, Dict, List
from datetime import datetime, timedelta
from exa_py import Exa
from asyncddgs import aDDGS
from tavily import TavilyClient
from .bing_search import BingSearch
"""
Web Search Tool Plugin
一个功能强大的网络搜索和URL解析插件支持多种搜索引擎和解析策略。
"""
from typing import List, Tuple, Type
from src.common.logger import get_logger
from typing import Tuple,Type
from src.plugin_system import (
BasePlugin,
register_plugin,
BaseTool,
ComponentInfo,
ConfigField,
llm_api,
ToolParamType,
PythonDependency
)
from src.plugin_system.apis import config_api # 添加config_api导入
from src.common.cache_manager import tool_cache
import httpx
from bs4 import BeautifulSoup
from src.plugin_system.apis import config_api
from src.common.logger import get_logger
logger = get_logger("web_surfing_tool")
from .tools.web_search import WebSurfingTool
from .tools.url_parser import URLParserTool
logger = get_logger("web_search_plugin")
class WebSurfingTool(BaseTool):
name: str = "web_search"
description: str = "用于执行网络搜索。当用户明确要求搜索,或者需要获取关于公司、产品、事件的最新信息、新闻或动态时,必须使用此工具"
available_for_llm: bool = True
parameters = [
("query", ToolParamType.STRING, "要搜索的关键词或问题。", True, None),
("num_results", ToolParamType.INTEGER, "期望每个搜索引擎返回的搜索结果数量默认为5。", False, None),
("time_range", ToolParamType.STRING, "指定搜索的时间范围,可以是 'any', 'week', 'month'。默认为 'any'", False, ["any", "week", "month"])
] # type: ignore
def __init__(self, plugin_config=None):
super().__init__(plugin_config)
self.bing_search = BingSearch()
# 初始化EXA API密钥轮询器
self.exa_clients = []
self.exa_key_cycle = None
# 优先从主配置文件读取,如果没有则从插件配置文件读取
EXA_API_KEYS = config_api.get_global_config("exa.api_keys", None)
if EXA_API_KEYS is None:
# 从插件配置文件读取
EXA_API_KEYS = self.get_config("exa.api_keys", [])
if isinstance(EXA_API_KEYS, list) and EXA_API_KEYS:
valid_keys = [key.strip() for key in EXA_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")]
if valid_keys:
self.exa_clients = [Exa(api_key=key) for key in valid_keys]
self.exa_key_cycle = itertools.cycle(self.exa_clients)
logger.info(f"已配置 {len(valid_keys)} 个 Exa API 密钥")
else:
logger.warning("Exa API Keys 配置无效Exa 搜索功能将不可用。")
else:
logger.warning("Exa API Keys 未配置Exa 搜索功能将不可用。")
# 初始化Tavily API密钥轮询器
self.tavily_clients = []
self.tavily_key_cycle = None
# 优先从主配置文件读取,如果没有则从插件配置文件读取
TAVILY_API_KEYS = config_api.get_global_config("tavily.api_keys", None)
if TAVILY_API_KEYS is None:
# 从插件配置文件读取
TAVILY_API_KEYS = self.get_config("tavily.api_keys", [])
if isinstance(TAVILY_API_KEYS, list) and TAVILY_API_KEYS:
valid_keys = [key.strip() for key in TAVILY_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")]
if valid_keys:
self.tavily_clients = [TavilyClient(api_key=key) for key in valid_keys]
self.tavily_key_cycle = itertools.cycle(self.tavily_clients)
logger.info(f"已配置 {len(valid_keys)} 个 Tavily API 密钥")
else:
logger.warning("Tavily API Keys 配置无效Tavily 搜索功能将不可用。")
else:
logger.warning("Tavily API Keys 未配置Tavily 搜索功能将不可用。")
async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]:
query = function_args.get("query")
if not query:
return {"error": "搜索查询不能为空。"}
# 获取当前文件路径用于缓存键
import os
current_file_path = os.path.abspath(__file__)
# 检查缓存
query = function_args.get("query")
cached_result = await tool_cache.get(self.name, function_args, current_file_path, semantic_query=query)
if cached_result:
logger.info(f"缓存命中: {self.name} -> {function_args}")
return cached_result
# 读取搜索配置
enabled_engines = config_api.get_global_config("web_search.enabled_engines", ["ddg"])
search_strategy = config_api.get_global_config("web_search.search_strategy", "single")
logger.info(f"开始搜索,策略: {search_strategy}, 启用引擎: {enabled_engines}, 参数: '{function_args}'")
# 根据策略执行搜索
if search_strategy == "parallel":
result = await self._execute_parallel_search(function_args, enabled_engines)
elif search_strategy == "fallback":
result = await self._execute_fallback_search(function_args, enabled_engines)
else: # single
result = await self._execute_single_search(function_args, enabled_engines)
# 保存到缓存
if "error" not in result:
query = function_args.get("query")
await tool_cache.set(self.name, function_args, current_file_path, result, semantic_query=query)
return result
async def _execute_parallel_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""并行搜索策略:同时使用所有启用的搜索引擎"""
search_tasks = []
for engine in enabled_engines:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
if engine == "exa" and self.exa_clients:
# 使用参数中的数量如果没有则默认5个
search_tasks.append(self._search_exa(custom_args))
elif engine == "tavily" and self.tavily_clients:
search_tasks.append(self._search_tavily(custom_args))
elif engine == "ddg":
search_tasks.append(self._search_ddg(custom_args))
elif engine == "bing":
search_tasks.append(self._search_bing(custom_args))
if not search_tasks:
return {"error": "没有可用的搜索引擎。"}
try:
search_results_lists = await asyncio.gather(*search_tasks, return_exceptions=True)
all_results = []
for result in search_results_lists:
if isinstance(result, list):
all_results.extend(result)
elif isinstance(result, Exception):
logger.error(f"搜索时发生错误: {result}")
# 去重并格式化
unique_results = self._deduplicate_results(all_results)
formatted_content = self._format_results(unique_results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.error(f"执行并行网络搜索时发生异常: {e}", exc_info=True)
return {"error": f"执行网络搜索时发生严重错误: {str(e)}"}
async def _execute_fallback_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""回退搜索策略:按顺序尝试搜索引擎,失败则尝试下一个"""
for engine in enabled_engines:
try:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
if engine == "exa" and self.exa_clients:
results = await self._search_exa(custom_args)
elif engine == "tavily" and self.tavily_clients:
results = await self._search_tavily(custom_args)
elif engine == "ddg":
results = await self._search_ddg(custom_args)
elif engine == "bing":
results = await self._search_bing(custom_args)
else:
continue
if results: # 如果有结果,直接返回
formatted_content = self._format_results(results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.warning(f"{engine} 搜索失败,尝试下一个引擎: {e}")
continue
return {"error": "所有搜索引擎都失败了。"}
async def _execute_single_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""单一搜索策略:只使用第一个可用的搜索引擎"""
for engine in enabled_engines:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
try:
if engine == "exa" and self.exa_clients:
results = await self._search_exa(custom_args)
elif engine == "tavily" and self.tavily_clients:
results = await self._search_tavily(custom_args)
elif engine == "ddg":
results = await self._search_ddg(custom_args)
elif engine == "bing":
results = await self._search_bing(custom_args)
else:
continue
formatted_content = self._format_results(results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.error(f"{engine} 搜索失败: {e}")
return {"error": f"{engine} 搜索失败: {str(e)}"}
return {"error": "没有可用的搜索引擎。"}
def _deduplicate_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
unique_urls = set()
unique_results = []
for res in results:
if isinstance(res, dict) and res.get("url") and res["url"] not in unique_urls:
unique_urls.add(res["url"])
unique_results.append(res)
return unique_results
async def _search_exa(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
query = args["query"]
num_results = args.get("num_results", 3)
time_range = args.get("time_range", "any")
exa_args = {"num_results": num_results, "text": True, "highlights": True}
if time_range != "any":
today = datetime.now()
start_date = today - timedelta(days=7 if time_range == "week" else 30)
exa_args["start_published_date"] = start_date.strftime('%Y-%m-%d')
try:
if not self.exa_key_cycle:
return []
# 使用轮询机制获取下一个客户端
exa_client = next(self.exa_key_cycle)
loop = asyncio.get_running_loop()
func = functools.partial(exa_client.search_and_contents, query, **exa_args)
search_response = await loop.run_in_executor(None, func)
return [
{
"title": res.title,
"url": res.url,
"snippet": " ".join(getattr(res, 'highlights', [])) or (getattr(res, 'text', '')[:250] + '...'),
"provider": "Exa"
}
for res in search_response.results
]
except Exception as e:
logger.error(f"Exa 搜索失败: {e}")
return []
async def _search_tavily(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
query = args["query"]
num_results = args.get("num_results", 3)
time_range = args.get("time_range", "any")
try:
if not self.tavily_key_cycle:
return []
# 使用轮询机制获取下一个客户端
tavily_client = next(self.tavily_key_cycle)
# 构建Tavily搜索参数
search_params = {
"query": query,
"max_results": num_results,
"search_depth": "basic",
"include_answer": False,
"include_raw_content": False
}
# 根据时间范围调整搜索参数
if time_range == "week":
search_params["days"] = 7
elif time_range == "month":
search_params["days"] = 30
loop = asyncio.get_running_loop()
func = functools.partial(tavily_client.search, **search_params)
search_response = await loop.run_in_executor(None, func)
results = []
if search_response and "results" in search_response:
for res in search_response["results"]:
results.append({
"title": res.get("title", "无标题"),
"url": res.get("url", ""),
"snippet": res.get("content", "")[:300] + "..." if res.get("content") else "无摘要",
"provider": "Tavily"
})
return results
except Exception as e:
logger.error(f"Tavily 搜索失败: {e}")
return []
async def _search_ddg(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
query = args["query"]
num_results = args.get("num_results", 3)
try:
async with aDDGS() as ddgs:
search_response = await ddgs.text(query, max_results=num_results)
return [
{
"title": r.get("title"),
"url": r.get("href"),
"snippet": r.get("body"),
"provider": "DuckDuckGo"
}
for r in search_response
]
except Exception as e:
logger.error(f"DuckDuckGo 搜索失败: {e}")
return []
async def _search_bing(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
query = args["query"]
num_results = args.get("num_results", 3)
try:
loop = asyncio.get_running_loop()
func = functools.partial(self.bing_search.search, query, num_results=num_results)
search_response = await loop.run_in_executor(None, func)
if search_response:
return [
{
"title": r.get("title"),
"url": r.get("url"),
"snippet": r.get("abstract"),
"provider": "Bing"
}
for r in search_response
]
except Exception as e:
logger.error(f"Bing 搜索失败: {e}")
return []
def _format_results(self, results: List[Dict[str, Any]]) -> str:
if not results:
return "没有找到相关的网络信息。"
formatted_string = "根据网络搜索结果:\n\n"
for i, res in enumerate(results, 1):
title = res.get("title", '无标题')
url = res.get("url", '#')
snippet = res.get("snippet", '无摘要')
provider = res.get("provider", "未知来源")
formatted_string += f"{i}. **{title}** (来自: {provider})\n"
formatted_string += f" - 摘要: {snippet}\n"
formatted_string += f" - 来源: {url}\n\n"
return formatted_string
class URLParserTool(BaseTool):
"""
一个用于解析和总结一个或多个网页URL内容的工具。
"""
name: str = "parse_url"
description: str = "当需要理解一个或多个特定网页链接的内容时,使用此工具。例如:'这些网页讲了什么?[https://example.com, https://example2.com]''帮我总结一下这些文章'"
available_for_llm: bool = True
parameters = [
("urls", ToolParamType.STRING, "要理解的网站", True, None),
]
def __init__(self, plugin_config=None):
super().__init__(plugin_config)
# 初始化EXA API密钥轮询器
self.exa_clients = []
self.exa_key_cycle = None
# 优先从主配置文件读取,如果没有则从插件配置文件读取
EXA_API_KEYS = config_api.get_global_config("exa.api_keys", None)
if EXA_API_KEYS is None:
# 从插件配置文件读取
EXA_API_KEYS = self.get_config("exa.api_keys", [])
if isinstance(EXA_API_KEYS, list) and EXA_API_KEYS:
valid_keys = [key.strip() for key in EXA_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")]
if valid_keys:
self.exa_clients = [Exa(api_key=key) for key in valid_keys]
self.exa_key_cycle = itertools.cycle(self.exa_clients)
logger.info(f"URL解析工具已配置 {len(valid_keys)} 个 Exa API 密钥")
else:
logger.warning("Exa API Keys 配置无效URL解析功能将受限。")
else:
logger.warning("Exa API Keys 未配置URL解析功能将受限。")
async def _local_parse_and_summarize(self, url: str) -> Dict[str, Any]:
"""
使用本地库(httpx, BeautifulSoup)解析URL并调用LLM进行总结。
"""
try:
# 读取代理配置
enable_proxy = self.get_config("proxy.enable_proxy", False)
proxies = None
if enable_proxy:
socks5_proxy = self.get_config("proxy.socks5_proxy", None)
http_proxy = self.get_config("proxy.http_proxy", None)
https_proxy = self.get_config("proxy.https_proxy", None)
# 优先使用SOCKS5代理全协议代理
if socks5_proxy:
proxies = socks5_proxy
logger.info(f"使用SOCKS5代理: {socks5_proxy}")
elif http_proxy or https_proxy:
proxies = {}
if http_proxy:
proxies["http://"] = http_proxy
if https_proxy:
proxies["https://"] = https_proxy
logger.info(f"使用HTTP/HTTPS代理配置: {proxies}")
client_kwargs = {"timeout": 15.0, "follow_redirects": True}
if proxies:
client_kwargs["proxies"] = proxies
async with httpx.AsyncClient(**client_kwargs) as client:
response = await client.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
title = soup.title.string if soup.title else "无标题"
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text(separator="\n", strip=True)
if not text:
return {"error": "无法从页面提取有效文本内容。"}
summary_prompt = f"请根据以下网页内容生成一段不超过300字的中文摘要保留核心信息和关键点:\n\n---\n\n标题: {title}\n\n内容:\n{text[:4000]}\n\n---\n\n摘要:"
text_model = str(self.get_config("models.text_model", "replyer_1"))
models = llm_api.get_available_models()
model_config = models.get(text_model)
if not model_config:
logger.error("未配置LLM模型")
return {"error": "未配置LLM模型"}
success, summary, reasoning, model_name = await llm_api.generate_with_model(
prompt=summary_prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if not success:
logger.info(f"生成摘要失败: {summary}")
return {"error": "发生ai错误"}
logger.info(f"成功生成摘要内容:'{summary}'")
return {
"title": title,
"url": url,
"snippet": summary,
"source": "local"
}
except httpx.HTTPStatusError as e:
logger.warning(f"本地解析URL '{url}' 失败 (HTTP {e.response.status_code})")
return {"error": f"请求失败,状态码: {e.response.status_code}"}
except Exception as e:
logger.error(f"本地解析或总结URL '{url}' 时发生未知异常: {e}", exc_info=True)
return {"error": f"发生未知错误: {str(e)}"}
async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]:
"""
执行URL内容提取和总结。优先使用Exa失败后尝试本地解析。
"""
# 获取当前文件路径用于缓存键
import os
current_file_path = os.path.abspath(__file__)
# 检查缓存
cached_result = await tool_cache.get(self.name, function_args, current_file_path)
if cached_result:
logger.info(f"缓存命中: {self.name} -> {function_args}")
return cached_result
urls_input = function_args.get("urls")
if not urls_input:
return {"error": "URL列表不能为空。"}
# 处理URL输入确保是列表格式
if isinstance(urls_input, str):
# 如果是字符串尝试解析为URL列表
import re
# 提取所有HTTP/HTTPS URL
url_pattern = r'https?://[^\s\],]+'
urls = re.findall(url_pattern, urls_input)
if not urls:
# 如果没有找到标准URL将整个字符串作为单个URL
if urls_input.strip().startswith(('http://', 'https://')):
urls = [urls_input.strip()]
else:
return {"error": "提供的字符串中未找到有效的URL。"}
elif isinstance(urls_input, list):
urls = [url.strip() for url in urls_input if isinstance(url, str) and url.strip()]
else:
return {"error": "URL格式不正确应为字符串或列表。"}
# 验证URL格式
valid_urls = []
for url in urls:
if url.startswith(('http://', 'https://')):
valid_urls.append(url)
else:
logger.warning(f"跳过无效URL: {url}")
if not valid_urls:
return {"error": "未找到有效的URL。"}
urls = valid_urls
logger.info(f"准备解析 {len(urls)} 个URL: {urls}")
successful_results = []
error_messages = []
urls_to_retry_locally = []
# 步骤 1: 尝试使用 Exa API 进行解析
contents_response = None
if self.exa_key_cycle:
logger.info(f"开始使用 Exa API 解析URL: {urls}")
try:
# 使用轮询机制获取下一个客户端
exa_client = next(self.exa_key_cycle)
loop = asyncio.get_running_loop()
exa_params = {"text": True, "summary": True, "highlights": True}
func = functools.partial(exa_client.get_contents, urls, **exa_params)
contents_response = await loop.run_in_executor(None, func)
except Exception as e:
logger.error(f"执行 Exa URL解析时发生严重异常: {e}", exc_info=True)
contents_response = None # 确保异常后为None
# 步骤 2: 处理Exa的响应
if contents_response and hasattr(contents_response, 'statuses'):
results_map = {res.url: res for res in contents_response.results} if hasattr(contents_response, 'results') else {}
if contents_response.statuses:
for status in contents_response.statuses:
if status.status == 'success':
res = results_map.get(status.id)
if res:
summary = getattr(res, 'summary', '')
highlights = " ".join(getattr(res, 'highlights', []))
text_snippet = (getattr(res, 'text', '')[:300] + '...') if getattr(res, 'text', '') else ''
snippet = summary or highlights or text_snippet or '无摘要'
successful_results.append({
"title": getattr(res, 'title', '无标题'),
"url": getattr(res, 'url', status.id),
"snippet": snippet,
"source": "exa"
})
else:
error_tag = getattr(status, 'error', '未知错误')
logger.warning(f"Exa解析URL '{status.id}' 失败: {error_tag}。准备本地重试。")
urls_to_retry_locally.append(status.id)
else:
# 如果Exa未配置、API调用失败或返回无效响应则所有URL都进入本地重试
urls_to_retry_locally.extend(url for url in urls if url not in [res['url'] for res in successful_results])
# 步骤 3: 对失败的URL进行本地解析
if urls_to_retry_locally:
logger.info(f"开始本地解析以下URL: {urls_to_retry_locally}")
local_tasks = [self._local_parse_and_summarize(url) for url in urls_to_retry_locally]
local_results = await asyncio.gather(*local_tasks)
for i, res in enumerate(local_results):
url = urls_to_retry_locally[i]
if "error" in res:
error_messages.append(f"URL: {url} - 解析失败: {res['error']}")
else:
successful_results.append(res)
if not successful_results:
return {"error": "无法从所有给定的URL获取内容。", "details": error_messages}
formatted_content = self._format_results(successful_results)
result = {
"type": "url_parse_result",
"content": formatted_content,
"errors": error_messages
}
# 保存到缓存
import os
current_file_path = os.path.abspath(__file__)
if "error" not in result:
await tool_cache.set(self.name, function_args, current_file_path, result)
return result
def _format_results(self, results: List[Dict[str, Any]]) -> str:
"""
将成功解析的结果列表格式化为一段简洁的文本。
"""
formatted_parts = []
for res in results:
title = res.get('title', '无标题')
url = res.get('url', '#')
snippet = res.get('snippet', '无摘要')
source = res.get('source', '未知')
formatted_string = f"**{title}**\n"
formatted_string += f"**内容摘要**:\n{snippet}\n"
formatted_string += f"**来源**: {url} (由 {source} 解析)\n"
formatted_parts.append(formatted_string)
return "\n---\n".join(formatted_parts)
@register_plugin
class WEBSEARCHPLUGIN(BasePlugin):
"""
网络搜索工具插件
提供网络搜索和URL解析功能支持多种搜索引擎
- Exa (需要API密钥)
- Tavily (需要API密钥)
- DuckDuckGo (免费)
- Bing (免费)
"""
# 插件基本信息
plugin_name: str = "web_search_tool" # 内部标识符
enable_plugin: bool = True
dependencies: List[str] = [] # 插件依赖列表
# Python包依赖列表 - 支持两种格式:
# 方式1: 简单字符串列表(向后兼容)
# python_dependencies: List[str] = ["asyncddgs", "exa_py", "httpx[socks]"]
# 方式2: 详细的PythonDependency对象推荐
def __init__(self, *args, **kwargs):
"""初始化插件,立即加载所有搜索引擎"""
super().__init__(*args, **kwargs)
# 立即初始化所有搜索引擎触发API密钥管理器的日志输出
logger.info("🚀 正在初始化所有搜索引擎...")
try:
from .engines.exa_engine import ExaSearchEngine
from .engines.tavily_engine import TavilySearchEngine
from .engines.ddg_engine import DDGSearchEngine
from .engines.bing_engine import BingSearchEngine
# 实例化所有搜索引擎这会触发API密钥管理器的初始化
exa_engine = ExaSearchEngine()
tavily_engine = TavilySearchEngine()
ddg_engine = DDGSearchEngine()
bing_engine = BingSearchEngine()
# 报告每个引擎的状态
engines_status = {
"Exa": exa_engine.is_available(),
"Tavily": tavily_engine.is_available(),
"DuckDuckGo": ddg_engine.is_available(),
"Bing": bing_engine.is_available()
}
available_engines = [name for name, available in engines_status.items() if available]
unavailable_engines = [name for name, available in engines_status.items() if not available]
if available_engines:
logger.info(f"✅ 可用搜索引擎: {', '.join(available_engines)}")
if unavailable_engines:
logger.info(f"❌ 不可用搜索引擎: {', '.join(unavailable_engines)}")
except Exception as e:
logger.error(f"❌ 搜索引擎初始化失败: {e}", exc_info=True)
# Python包依赖列表
python_dependencies: List[PythonDependency] = [
PythonDependency(
package_name="asyncddgs",
@@ -677,7 +104,10 @@ class WEBSEARCHPLUGIN(BasePlugin):
config_file_name: str = "config.toml" # 配置文件名
# 配置节描述
config_section_descriptions = {"plugin": "插件基本信息", "proxy": "链接本地解析代理配置"}
config_section_descriptions = {
"plugin": "插件基本信息",
"proxy": "链接本地解析代理配置"
}
# 配置Schema定义
# 注意EXA配置和组件设置已迁移到主配置文件(bot_config.toml)的[exa]和[web_search]部分
@@ -688,17 +118,43 @@ class WEBSEARCHPLUGIN(BasePlugin):
"enabled": ConfigField(type=bool, default=False, description="是否启用插件"),
},
"proxy": {
"http_proxy": ConfigField(type=str, default=None, description="HTTP代理地址格式如: http://proxy.example.com:8080"),
"https_proxy": ConfigField(type=str, default=None, description="HTTPS代理地址格式如: http://proxy.example.com:8080"),
"socks5_proxy": ConfigField(type=str, default=None, description="SOCKS5代理地址格式如: socks5://proxy.example.com:1080"),
"enable_proxy": ConfigField(type=bool, default=False, description="是否启用代理")
"http_proxy": ConfigField(
type=str,
default=None,
description="HTTP代理地址格式如: http://proxy.example.com:8080"
),
"https_proxy": ConfigField(
type=str,
default=None,
description="HTTPS代理地址格式如: http://proxy.example.com:8080"
),
"socks5_proxy": ConfigField(
type=str,
default=None,
description="SOCKS5代理地址格式如: socks5://proxy.example.com:1080"
),
"enable_proxy": ConfigField(
type=bool,
default=False,
description="是否启用代理"
)
},
}
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
enable_tool =[]
"""
获取插件组件列表
Returns:
组件信息和类型的元组列表
"""
enable_tool = []
# 从主配置文件读取组件启用配置
if config_api.get_global_config("web_search.enable_web_search_tool", True):
if True:
enable_tool.append((WebSurfingTool.get_tool_info(), WebSurfingTool))
if config_api.get_global_config("web_search.enable_url_tool", True):
enable_tool.append((URLParserTool.get_tool_info(), URLParserTool))
return enable_tool

View File

@@ -0,0 +1,3 @@
"""
Search engines package
"""

View File

@@ -0,0 +1,31 @@
"""
Base search engine interface
"""
from abc import ABC, abstractmethod
from typing import Dict, List, Any
class BaseSearchEngine(ABC):
"""
搜索引擎基类
"""
@abstractmethod
async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
执行搜索
Args:
args: 搜索参数,包含 query、num_results、time_range 等
Returns:
搜索结果列表,每个结果包含 title、url、snippet、provider 字段
"""
pass
@abstractmethod
def is_available(self) -> bool:
"""
检查搜索引擎是否可用
"""
pass

View File

@@ -0,0 +1,265 @@
"""
Bing search engine implementation
"""
import asyncio
import functools
import random
import os
import traceback
from typing import Dict, List, Any
from datetime import datetime, timedelta
import requests
from bs4 import BeautifulSoup
from src.common.logger import get_logger
from .base import BaseSearchEngine
logger = get_logger("bing_engine")
ABSTRACT_MAX_LENGTH = 300 # abstract max length
user_agents = [
# Edge浏览器
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
# Chrome浏览器
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
# Firefox浏览器
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:123.0) Gecko/20100101 Firefox/123.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0",
]
# 请求头信息
HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Cache-Control": "max-age=0",
"Connection": "keep-alive",
"Host": "www.bing.com",
"Referer": "https://www.bing.com/",
"Sec-Ch-Ua": '"Chromium";v="122", "Microsoft Edge";v="122", "Not-A.Brand";v="99"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
}
bing_search_url = "https://www.bing.com/search?q="
class BingSearchEngine(BaseSearchEngine):
"""
Bing搜索引擎实现
"""
def __init__(self):
self.session = requests.Session()
self.session.headers = HEADERS
def is_available(self) -> bool:
"""检查Bing搜索引擎是否可用"""
return True # Bing是免费搜索引擎总是可用
async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
"""执行Bing搜索"""
query = args["query"]
num_results = args.get("num_results", 3)
time_range = args.get("time_range", "any")
try:
loop = asyncio.get_running_loop()
func = functools.partial(self._search_sync, query, num_results, time_range)
search_response = await loop.run_in_executor(None, func)
return search_response
except Exception as e:
logger.error(f"Bing 搜索失败: {e}")
return []
def _search_sync(self, keyword: str, num_results: int, time_range: str) -> List[Dict[str, Any]]:
"""同步执行Bing搜索"""
if not keyword:
return []
list_result = []
# 构建搜索URL
search_url = bing_search_url + keyword
# 如果指定了时间范围,添加时间过滤参数
if time_range == "week":
search_url += "&qft=+filterui:date-range-7"
elif time_range == "month":
search_url += "&qft=+filterui:date-range-30"
try:
data = self._parse_html(search_url)
if data:
list_result.extend(data)
logger.debug(f"Bing搜索 [{keyword}] 找到 {len(data)} 个结果")
except Exception as e:
logger.error(f"Bing搜索解析失败: {e}")
return []
logger.debug(f"Bing搜索 [{keyword}] 完成,总共 {len(list_result)} 个结果")
return list_result[:num_results] if len(list_result) > num_results else list_result
def _parse_html(self, url: str) -> List[Dict[str, Any]]:
"""解析处理结果"""
try:
logger.debug(f"访问Bing搜索URL: {url}")
# 设置必要的Cookie
cookies = {
"SRCHHPGUSR": "SRCHLANG=zh-Hans", # 设置默认搜索语言为中文
"SRCHD": "AF=NOFORM",
"SRCHUID": "V=2&GUID=1A4D4F1C8844493F9A2E3DB0D1BC806C",
"_SS": "SID=0D89D9A3C95C60B62E7AC80CC85461B3",
"_EDGE_S": "ui=zh-cn", # 设置界面语言为中文
"_EDGE_V": "1",
}
# 为每次请求随机选择不同的用户代理,降低被屏蔽风险
headers = HEADERS.copy()
headers["User-Agent"] = random.choice(user_agents)
# 创建新的session
session = requests.Session()
session.headers.update(headers)
session.cookies.update(cookies)
# 发送请求
try:
res = session.get(url=url, timeout=(3.05, 6), verify=True, allow_redirects=True)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
logger.warning(f"第一次请求超时,正在重试: {str(e)}")
try:
res = session.get(url=url, timeout=(5, 10), verify=False)
except Exception as e2:
logger.error(f"第二次请求也失败: {str(e2)}")
return []
res.encoding = "utf-8"
# 检查响应状态
if res.status_code == 403:
logger.error("被禁止访问 (403 Forbidden)可能是IP被限制")
return []
if res.status_code != 200:
logger.error(f"必应搜索请求失败,状态码: {res.status_code}")
return []
# 检查是否被重定向到登录页面或验证页面
if "login.live.com" in res.url or "login.microsoftonline.com" in res.url:
logger.error("被重定向到登录页面,可能需要登录")
return []
if "https://www.bing.com/ck/a" in res.url:
logger.error("被重定向到验证页面,可能被识别为机器人")
return []
# 解析HTML
try:
root = BeautifulSoup(res.text, "lxml")
except Exception:
try:
root = BeautifulSoup(res.text, "html.parser")
except Exception as e:
logger.error(f"HTML解析失败: {str(e)}")
return []
list_data = []
# 尝试提取搜索结果
# 方法1: 查找标准的搜索结果容器
results = root.select("ol#b_results li.b_algo")
if results:
for rank, result in enumerate(results, 1):
# 提取标题和链接
title_link = result.select_one("h2 a")
if not title_link:
continue
title = title_link.get_text().strip()
url = title_link.get("href", "")
# 提取摘要
abstract = ""
abstract_elem = result.select_one("div.b_caption p")
if abstract_elem:
abstract = abstract_elem.get_text().strip()
# 限制摘要长度
if ABSTRACT_MAX_LENGTH and len(abstract) > ABSTRACT_MAX_LENGTH:
abstract = abstract[:ABSTRACT_MAX_LENGTH] + "..."
list_data.append({
"title": title,
"url": url,
"snippet": abstract,
"provider": "Bing"
})
if len(list_data) >= 10: # 限制结果数量
break
# 方法2: 如果标准方法没找到结果,使用备用方法
if not list_data:
# 查找所有可能的搜索结果链接
all_links = root.find_all("a")
for link in all_links:
href = link.get("href", "")
text = link.get_text().strip()
# 过滤有效的搜索结果链接
if (href and text and len(text) > 10
and not href.startswith("javascript:")
and not href.startswith("#")
and "http" in href
and not any(x in href for x in [
"bing.com/search", "bing.com/images", "bing.com/videos",
"bing.com/maps", "bing.com/news", "login", "account",
"microsoft", "javascript"
])):
# 尝试获取摘要
abstract = ""
parent = link.parent
if parent and parent.get_text():
full_text = parent.get_text().strip()
if len(full_text) > len(text):
abstract = full_text.replace(text, "", 1).strip()
# 限制摘要长度
if ABSTRACT_MAX_LENGTH and len(abstract) > ABSTRACT_MAX_LENGTH:
abstract = abstract[:ABSTRACT_MAX_LENGTH] + "..."
list_data.append({
"title": text,
"url": href,
"snippet": abstract,
"provider": "Bing"
})
if len(list_data) >= 10:
break
logger.debug(f"从Bing解析到 {len(list_data)} 个搜索结果")
return list_data
except Exception as e:
logger.error(f"解析Bing页面时出错: {str(e)}")
logger.debug(traceback.format_exc())
return []

View File

@@ -0,0 +1,42 @@
"""
DuckDuckGo search engine implementation
"""
from typing import Dict, List, Any
from asyncddgs import aDDGS
from src.common.logger import get_logger
from .base import BaseSearchEngine
logger = get_logger("ddg_engine")
class DDGSearchEngine(BaseSearchEngine):
"""
DuckDuckGo搜索引擎实现
"""
def is_available(self) -> bool:
"""检查DuckDuckGo搜索引擎是否可用"""
return True # DuckDuckGo不需要API密钥总是可用
async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
"""执行DuckDuckGo搜索"""
query = args["query"]
num_results = args.get("num_results", 3)
try:
async with aDDGS() as ddgs:
search_response = await ddgs.text(query, max_results=num_results)
return [
{
"title": r.get("title"),
"url": r.get("href"),
"snippet": r.get("body"),
"provider": "DuckDuckGo"
}
for r in search_response
]
except Exception as e:
logger.error(f"DuckDuckGo 搜索失败: {e}")
return []

View File

@@ -0,0 +1,79 @@
"""
Exa search engine implementation
"""
import asyncio
import functools
from datetime import datetime, timedelta
from typing import Dict, List, Any
from exa_py import Exa
from src.common.logger import get_logger
from src.plugin_system.apis import config_api
from .base import BaseSearchEngine
from ..utils.api_key_manager import create_api_key_manager_from_config
logger = get_logger("exa_engine")
class ExaSearchEngine(BaseSearchEngine):
"""
Exa搜索引擎实现
"""
def __init__(self):
self._initialize_clients()
def _initialize_clients(self):
"""初始化Exa客户端"""
# 从主配置文件读取API密钥
exa_api_keys = config_api.get_global_config("exa.api_keys", None)
# 创建API密钥管理器
self.api_manager = create_api_key_manager_from_config(
exa_api_keys,
lambda key: Exa(api_key=key),
"Exa"
)
def is_available(self) -> bool:
"""检查Exa搜索引擎是否可用"""
return self.api_manager.is_available()
async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
"""执行Exa搜索"""
if not self.is_available():
return []
query = args["query"]
num_results = args.get("num_results", 3)
time_range = args.get("time_range", "any")
exa_args = {"num_results": num_results, "text": True, "highlights": True}
if time_range != "any":
today = datetime.now()
start_date = today - timedelta(days=7 if time_range == "week" else 30)
exa_args["start_published_date"] = start_date.strftime('%Y-%m-%d')
try:
# 使用API密钥管理器获取下一个客户端
exa_client = self.api_manager.get_next_client()
if not exa_client:
logger.error("无法获取Exa客户端")
return []
loop = asyncio.get_running_loop()
func = functools.partial(exa_client.search_and_contents, query, **exa_args)
search_response = await loop.run_in_executor(None, func)
return [
{
"title": res.title,
"url": res.url,
"snippet": " ".join(getattr(res, 'highlights', [])) or (getattr(res, 'text', '')[:250] + '...'),
"provider": "Exa"
}
for res in search_response.results
]
except Exception as e:
logger.error(f"Exa 搜索失败: {e}")
return []

View File

@@ -0,0 +1,90 @@
"""
Tavily search engine implementation
"""
import asyncio
import functools
from typing import Dict, List, Any
from tavily import TavilyClient
from src.common.logger import get_logger
from src.plugin_system.apis import config_api
from .base import BaseSearchEngine
from ..utils.api_key_manager import create_api_key_manager_from_config
logger = get_logger("tavily_engine")
class TavilySearchEngine(BaseSearchEngine):
"""
Tavily搜索引擎实现
"""
def __init__(self):
self._initialize_clients()
def _initialize_clients(self):
"""初始化Tavily客户端"""
# 从主配置文件读取API密钥
tavily_api_keys = config_api.get_global_config("tavily.api_keys", None)
# 创建API密钥管理器
self.api_manager = create_api_key_manager_from_config(
tavily_api_keys,
lambda key: TavilyClient(api_key=key),
"Tavily"
)
def is_available(self) -> bool:
"""检查Tavily搜索引擎是否可用"""
return self.api_manager.is_available()
async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
"""执行Tavily搜索"""
if not self.is_available():
return []
query = args["query"]
num_results = args.get("num_results", 3)
time_range = args.get("time_range", "any")
try:
# 使用API密钥管理器获取下一个客户端
tavily_client = self.api_manager.get_next_client()
if not tavily_client:
logger.error("无法获取Tavily客户端")
return []
# 构建Tavily搜索参数
search_params = {
"query": query,
"max_results": num_results,
"search_depth": "basic",
"include_answer": False,
"include_raw_content": False
}
# 根据时间范围调整搜索参数
if time_range == "week":
search_params["days"] = 7
elif time_range == "month":
search_params["days"] = 30
loop = asyncio.get_running_loop()
func = functools.partial(tavily_client.search, **search_params)
search_response = await loop.run_in_executor(None, func)
results = []
if search_response and "results" in search_response:
for res in search_response["results"]:
results.append({
"title": res.get("title", "无标题"),
"url": res.get("url", ""),
"snippet": res.get("content", "")[:300] + "..." if res.get("content") else "无摘要",
"provider": "Tavily"
})
return results
except Exception as e:
logger.error(f"Tavily 搜索失败: {e}")
return []

View File

@@ -0,0 +1,160 @@
"""
Web Search Tool Plugin
一个功能强大的网络搜索和URL解析插件支持多种搜索引擎和解析策略。
"""
from typing import List, Tuple, Type
from src.plugin_system import (
BasePlugin,
register_plugin,
ComponentInfo,
ConfigField,
PythonDependency
)
from src.plugin_system.apis import config_api
from src.common.logger import get_logger
from .tools.web_search import WebSurfingTool
from .tools.url_parser import URLParserTool
logger = get_logger("web_search_plugin")
@register_plugin
class WEBSEARCHPLUGIN(BasePlugin):
"""
网络搜索工具插件
提供网络搜索和URL解析功能支持多种搜索引擎
- Exa (需要API密钥)
- Tavily (需要API密钥)
- DuckDuckGo (免费)
- Bing (免费)
"""
# 插件基本信息
plugin_name: str = "web_search_tool" # 内部标识符
enable_plugin: bool = True
dependencies: List[str] = [] # 插件依赖列表
def __init__(self, *args, **kwargs):
"""初始化插件,立即加载所有搜索引擎"""
super().__init__(*args, **kwargs)
# 立即初始化所有搜索引擎触发API密钥管理器的日志输出
logger.info("🚀 正在初始化所有搜索引擎...")
try:
from .engines.exa_engine import ExaSearchEngine
from .engines.tavily_engine import TavilySearchEngine
from .engines.ddg_engine import DDGSearchEngine
from .engines.bing_engine import BingSearchEngine
# 实例化所有搜索引擎这会触发API密钥管理器的初始化
exa_engine = ExaSearchEngine()
tavily_engine = TavilySearchEngine()
ddg_engine = DDGSearchEngine()
bing_engine = BingSearchEngine()
# 报告每个引擎的状态
engines_status = {
"Exa": exa_engine.is_available(),
"Tavily": tavily_engine.is_available(),
"DuckDuckGo": ddg_engine.is_available(),
"Bing": bing_engine.is_available()
}
available_engines = [name for name, available in engines_status.items() if available]
unavailable_engines = [name for name, available in engines_status.items() if not available]
if available_engines:
logger.info(f"✅ 可用搜索引擎: {', '.join(available_engines)}")
if unavailable_engines:
logger.info(f"❌ 不可用搜索引擎: {', '.join(unavailable_engines)}")
except Exception as e:
logger.error(f"❌ 搜索引擎初始化失败: {e}", exc_info=True)
# Python包依赖列表
python_dependencies: List[PythonDependency] = [
PythonDependency(
package_name="asyncddgs",
description="异步DuckDuckGo搜索库",
optional=False
),
PythonDependency(
package_name="exa_py",
description="Exa搜索API客户端库",
optional=True # 如果没有API密钥这个是可选的
),
PythonDependency(
package_name="tavily",
install_name="tavily-python", # 安装时使用这个名称
description="Tavily搜索API客户端库",
optional=True # 如果没有API密钥这个是可选的
),
PythonDependency(
package_name="httpx",
version=">=0.20.0",
install_name="httpx[socks]", # 安装时使用这个名称(包含可选依赖)
description="支持SOCKS代理的HTTP客户端库",
optional=False
)
]
config_file_name: str = "config.toml" # 配置文件名
# 配置节描述
config_section_descriptions = {
"plugin": "插件基本信息",
"proxy": "链接本地解析代理配置"
}
# 配置Schema定义
# 注意EXA配置和组件设置已迁移到主配置文件(bot_config.toml)的[exa]和[web_search]部分
config_schema: dict = {
"plugin": {
"name": ConfigField(type=str, default="WEB_SEARCH_PLUGIN", description="插件名称"),
"version": ConfigField(type=str, default="1.0.0", description="插件版本"),
"enabled": ConfigField(type=bool, default=False, description="是否启用插件"),
},
"proxy": {
"http_proxy": ConfigField(
type=str,
default=None,
description="HTTP代理地址格式如: http://proxy.example.com:8080"
),
"https_proxy": ConfigField(
type=str,
default=None,
description="HTTPS代理地址格式如: http://proxy.example.com:8080"
),
"socks5_proxy": ConfigField(
type=str,
default=None,
description="SOCKS5代理地址格式如: socks5://proxy.example.com:1080"
),
"enable_proxy": ConfigField(
type=bool,
default=False,
description="是否启用代理"
)
},
}
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
"""
获取插件组件列表
Returns:
组件信息和类型的元组列表
"""
enable_tool = []
# 从主配置文件读取组件启用配置
if True:
enable_tool.append((WebSurfingTool.get_tool_info(), WebSurfingTool))
if config_api.get_global_config("web_search.enable_url_tool", True):
enable_tool.append((URLParserTool.get_tool_info(), URLParserTool))
return enable_tool

View File

@@ -0,0 +1,3 @@
"""
Tools package
"""

View File

@@ -0,0 +1,242 @@
"""
URL parser tool implementation
"""
import asyncio
import functools
from typing import Any, Dict, List
from exa_py import Exa
import httpx
from bs4 import BeautifulSoup
from src.common.logger import get_logger
from src.plugin_system import BaseTool, ToolParamType, llm_api
from src.plugin_system.apis import config_api
from src.common.cache_manager import tool_cache
from ..utils.formatters import format_url_parse_results
from ..utils.url_utils import parse_urls_from_input, validate_urls
from ..utils.api_key_manager import create_api_key_manager_from_config
logger = get_logger("url_parser_tool")
class URLParserTool(BaseTool):
"""
一个用于解析和总结一个或多个网页URL内容的工具。
"""
name: str = "parse_url"
description: str = "当需要理解一个或多个特定网页链接的内容时,使用此工具。例如:'这些网页讲了什么?[https://example.com, https://example2.com]''帮我总结一下这些文章'"
available_for_llm: bool = True
parameters = [
("urls", ToolParamType.STRING, "要理解的网站", True, None),
]
def __init__(self, plugin_config=None):
super().__init__(plugin_config)
self._initialize_exa_clients()
def _initialize_exa_clients(self):
"""初始化Exa客户端"""
# 优先从主配置文件读取,如果没有则从插件配置文件读取
exa_api_keys = config_api.get_global_config("exa.api_keys", None)
if exa_api_keys is None:
# 从插件配置文件读取
exa_api_keys = self.get_config("exa.api_keys", [])
# 创建API密钥管理器
self.api_manager = create_api_key_manager_from_config(
exa_api_keys,
lambda key: Exa(api_key=key),
"Exa URL Parser"
)
async def _local_parse_and_summarize(self, url: str) -> Dict[str, Any]:
"""
使用本地库(httpx, BeautifulSoup)解析URL并调用LLM进行总结。
"""
try:
# 读取代理配置
enable_proxy = self.get_config("proxy.enable_proxy", False)
proxies = None
if enable_proxy:
socks5_proxy = self.get_config("proxy.socks5_proxy", None)
http_proxy = self.get_config("proxy.http_proxy", None)
https_proxy = self.get_config("proxy.https_proxy", None)
# 优先使用SOCKS5代理全协议代理
if socks5_proxy:
proxies = socks5_proxy
logger.info(f"使用SOCKS5代理: {socks5_proxy}")
elif http_proxy or https_proxy:
proxies = {}
if http_proxy:
proxies["http://"] = http_proxy
if https_proxy:
proxies["https://"] = https_proxy
logger.info(f"使用HTTP/HTTPS代理配置: {proxies}")
client_kwargs = {"timeout": 15.0, "follow_redirects": True}
if proxies:
client_kwargs["proxies"] = proxies
async with httpx.AsyncClient(**client_kwargs) as client:
response = await client.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
title = soup.title.string if soup.title else "无标题"
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text(separator="\n", strip=True)
if not text:
return {"error": "无法从页面提取有效文本内容。"}
summary_prompt = f"请根据以下网页内容生成一段不超过300字的中文摘要保留核心信息和关键点:\n\n---\n\n标题: {title}\n\n内容:\n{text[:4000]}\n\n---\n\n摘要:"
text_model = str(self.get_config("models.text_model", "replyer_1"))
models = llm_api.get_available_models()
model_config = models.get(text_model)
if not model_config:
logger.error("未配置LLM模型")
return {"error": "未配置LLM模型"}
success, summary, reasoning, model_name = await llm_api.generate_with_model(
prompt=summary_prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if not success:
logger.info(f"生成摘要失败: {summary}")
return {"error": "发生ai错误"}
logger.info(f"成功生成摘要内容:'{summary}'")
return {
"title": title,
"url": url,
"snippet": summary,
"source": "local"
}
except httpx.HTTPStatusError as e:
logger.warning(f"本地解析URL '{url}' 失败 (HTTP {e.response.status_code})")
return {"error": f"请求失败,状态码: {e.response.status_code}"}
except Exception as e:
logger.error(f"本地解析或总结URL '{url}' 时发生未知异常: {e}", exc_info=True)
return {"error": f"发生未知错误: {str(e)}"}
async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]:
"""
执行URL内容提取和总结。优先使用Exa失败后尝试本地解析。
"""
# 获取当前文件路径用于缓存键
import os
current_file_path = os.path.abspath(__file__)
# 检查缓存
cached_result = await tool_cache.get(self.name, function_args, current_file_path)
if cached_result:
logger.info(f"缓存命中: {self.name} -> {function_args}")
return cached_result
urls_input = function_args.get("urls")
if not urls_input:
return {"error": "URL列表不能为空。"}
# 处理URL输入确保是列表格式
urls = parse_urls_from_input(urls_input)
if not urls:
return {"error": "提供的字符串中未找到有效的URL。"}
# 验证URL格式
valid_urls = validate_urls(urls)
if not valid_urls:
return {"error": "未找到有效的URL。"}
urls = valid_urls
logger.info(f"准备解析 {len(urls)} 个URL: {urls}")
successful_results = []
error_messages = []
urls_to_retry_locally = []
# 步骤 1: 尝试使用 Exa API 进行解析
contents_response = None
if self.api_manager.is_available():
logger.info(f"开始使用 Exa API 解析URL: {urls}")
try:
# 使用API密钥管理器获取下一个客户端
exa_client = self.api_manager.get_next_client()
if not exa_client:
logger.error("无法获取Exa客户端")
else:
loop = asyncio.get_running_loop()
exa_params = {"text": True, "summary": True, "highlights": True}
func = functools.partial(exa_client.get_contents, urls, **exa_params)
contents_response = await loop.run_in_executor(None, func)
except Exception as e:
logger.error(f"执行 Exa URL解析时发生严重异常: {e}", exc_info=True)
contents_response = None # 确保异常后为None
# 步骤 2: 处理Exa的响应
if contents_response and hasattr(contents_response, 'statuses'):
results_map = {res.url: res for res in contents_response.results} if hasattr(contents_response, 'results') else {}
if contents_response.statuses:
for status in contents_response.statuses:
if status.status == 'success':
res = results_map.get(status.id)
if res:
summary = getattr(res, 'summary', '')
highlights = " ".join(getattr(res, 'highlights', []))
text_snippet = (getattr(res, 'text', '')[:300] + '...') if getattr(res, 'text', '') else ''
snippet = summary or highlights or text_snippet or '无摘要'
successful_results.append({
"title": getattr(res, 'title', '无标题'),
"url": getattr(res, 'url', status.id),
"snippet": snippet,
"source": "exa"
})
else:
error_tag = getattr(status, 'error', '未知错误')
logger.warning(f"Exa解析URL '{status.id}' 失败: {error_tag}。准备本地重试。")
urls_to_retry_locally.append(status.id)
else:
# 如果Exa未配置、API调用失败或返回无效响应则所有URL都进入本地重试
urls_to_retry_locally.extend(url for url in urls if url not in [res['url'] for res in successful_results])
# 步骤 3: 对失败的URL进行本地解析
if urls_to_retry_locally:
logger.info(f"开始本地解析以下URL: {urls_to_retry_locally}")
local_tasks = [self._local_parse_and_summarize(url) for url in urls_to_retry_locally]
local_results = await asyncio.gather(*local_tasks)
for i, res in enumerate(local_results):
url = urls_to_retry_locally[i]
if "error" in res:
error_messages.append(f"URL: {url} - 解析失败: {res['error']}")
else:
successful_results.append(res)
if not successful_results:
return {"error": "无法从所有给定的URL获取内容。", "details": error_messages}
formatted_content = format_url_parse_results(successful_results)
result = {
"type": "url_parse_result",
"content": formatted_content,
"errors": error_messages
}
# 保存到缓存
if "error" not in result:
await tool_cache.set(self.name, function_args, current_file_path, result)
return result

View File

@@ -0,0 +1,164 @@
"""
Web search tool implementation
"""
import asyncio
from typing import Any, Dict, List
from src.common.logger import get_logger
from src.plugin_system import BaseTool, ToolParamType, llm_api
from src.plugin_system.apis import config_api
from src.common.cache_manager import tool_cache
from ..engines.exa_engine import ExaSearchEngine
from ..engines.tavily_engine import TavilySearchEngine
from ..engines.ddg_engine import DDGSearchEngine
from ..engines.bing_engine import BingSearchEngine
from ..utils.formatters import format_search_results, deduplicate_results
logger = get_logger("web_search_tool")
class WebSurfingTool(BaseTool):
"""
网络搜索工具
"""
name: str = "web_search"
description: str = "用于执行网络搜索。当用户明确要求搜索,或者需要获取关于公司、产品、事件的最新信息、新闻或动态时,必须使用此工具"
available_for_llm: bool = True
parameters = [
("query", ToolParamType.STRING, "要搜索的关键词或问题。", True, None),
("num_results", ToolParamType.INTEGER, "期望每个搜索引擎返回的搜索结果数量默认为5。", False, None),
("time_range", ToolParamType.STRING, "指定搜索的时间范围,可以是 'any', 'week', 'month'。默认为 'any'", False, ["any", "week", "month"])
] # type: ignore
def __init__(self, plugin_config=None):
super().__init__(plugin_config)
# 初始化搜索引擎
self.engines = {
"exa": ExaSearchEngine(),
"tavily": TavilySearchEngine(),
"ddg": DDGSearchEngine(),
"bing": BingSearchEngine()
}
async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]:
query = function_args.get("query")
if not query:
return {"error": "搜索查询不能为空。"}
# 获取当前文件路径用于缓存键
import os
current_file_path = os.path.abspath(__file__)
# 检查缓存
cached_result = await tool_cache.get(self.name, function_args, current_file_path, semantic_query=query)
if cached_result:
logger.info(f"缓存命中: {self.name} -> {function_args}")
return cached_result
# 读取搜索配置
enabled_engines = config_api.get_global_config("web_search.enabled_engines", ["ddg"])
search_strategy = config_api.get_global_config("web_search.search_strategy", "single")
logger.info(f"开始搜索,策略: {search_strategy}, 启用引擎: {enabled_engines}, 参数: '{function_args}'")
# 根据策略执行搜索
if search_strategy == "parallel":
result = await self._execute_parallel_search(function_args, enabled_engines)
elif search_strategy == "fallback":
result = await self._execute_fallback_search(function_args, enabled_engines)
else: # single
result = await self._execute_single_search(function_args, enabled_engines)
# 保存到缓存
if "error" not in result:
await tool_cache.set(self.name, function_args, current_file_path, result, semantic_query=query)
return result
async def _execute_parallel_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""并行搜索策略:同时使用所有启用的搜索引擎"""
search_tasks = []
for engine_name in enabled_engines:
engine = self.engines.get(engine_name)
if engine and engine.is_available():
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
search_tasks.append(engine.search(custom_args))
if not search_tasks:
return {"error": "没有可用的搜索引擎。"}
try:
search_results_lists = await asyncio.gather(*search_tasks, return_exceptions=True)
all_results = []
for result in search_results_lists:
if isinstance(result, list):
all_results.extend(result)
elif isinstance(result, Exception):
logger.error(f"搜索时发生错误: {result}")
# 去重并格式化
unique_results = deduplicate_results(all_results)
formatted_content = format_search_results(unique_results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.error(f"执行并行网络搜索时发生异常: {e}", exc_info=True)
return {"error": f"执行网络搜索时发生严重错误: {str(e)}"}
async def _execute_fallback_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""回退搜索策略:按顺序尝试搜索引擎,失败则尝试下一个"""
for engine_name in enabled_engines:
engine = self.engines.get(engine_name)
if not engine or not engine.is_available():
continue
try:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
results = await engine.search(custom_args)
if results: # 如果有结果,直接返回
formatted_content = format_search_results(results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.warning(f"{engine_name} 搜索失败,尝试下一个引擎: {e}")
continue
return {"error": "所有搜索引擎都失败了。"}
async def _execute_single_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""单一搜索策略:只使用第一个可用的搜索引擎"""
for engine_name in enabled_engines:
engine = self.engines.get(engine_name)
if not engine or not engine.is_available():
continue
try:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
results = await engine.search(custom_args)
formatted_content = format_search_results(results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.error(f"{engine_name} 搜索失败: {e}")
return {"error": f"{engine_name} 搜索失败: {str(e)}"}
return {"error": "没有可用的搜索引擎。"}

View File

@@ -0,0 +1,3 @@
"""
Web search tool utilities package
"""

View File

@@ -0,0 +1,84 @@
"""
API密钥管理器提供轮询机制
"""
import itertools
from typing import List, Optional, TypeVar, Generic, Callable
from src.common.logger import get_logger
logger = get_logger("api_key_manager")
T = TypeVar('T')
class APIKeyManager(Generic[T]):
"""
API密钥管理器支持轮询机制
"""
def __init__(self, api_keys: List[str], client_factory: Callable[[str], T], service_name: str = "Unknown"):
"""
初始化API密钥管理器
Args:
api_keys: API密钥列表
client_factory: 客户端工厂函数接受API密钥参数并返回客户端实例
service_name: 服务名称,用于日志记录
"""
self.service_name = service_name
self.clients: List[T] = []
self.client_cycle: Optional[itertools.cycle] = None
if api_keys:
# 过滤有效的API密钥排除None、空字符串、"None"字符串等
valid_keys = []
for key in api_keys:
if isinstance(key, str) and key.strip() and key.strip().lower() not in ("none", "null", ""):
valid_keys.append(key.strip())
if valid_keys:
try:
self.clients = [client_factory(key) for key in valid_keys]
self.client_cycle = itertools.cycle(self.clients)
logger.info(f"🔑 {service_name} 成功加载 {len(valid_keys)} 个 API 密钥")
except Exception as e:
logger.error(f"❌ 初始化 {service_name} 客户端失败: {e}")
self.clients = []
self.client_cycle = None
else:
logger.warning(f"⚠️ {service_name} API Keys 配置无效包含None或空值{service_name} 功能将不可用")
else:
logger.warning(f"⚠️ {service_name} API Keys 未配置,{service_name} 功能将不可用")
def is_available(self) -> bool:
"""检查是否有可用的客户端"""
return bool(self.clients and self.client_cycle)
def get_next_client(self) -> Optional[T]:
"""获取下一个客户端(轮询)"""
if not self.is_available():
return None
return next(self.client_cycle)
def get_client_count(self) -> int:
"""获取可用客户端数量"""
return len(self.clients)
def create_api_key_manager_from_config(
config_keys: Optional[List[str]],
client_factory: Callable[[str], T],
service_name: str
) -> APIKeyManager[T]:
"""
从配置创建API密钥管理器的便捷函数
Args:
config_keys: 从配置读取的API密钥列表
client_factory: 客户端工厂函数
service_name: 服务名称
Returns:
API密钥管理器实例
"""
api_keys = config_keys if isinstance(config_keys, list) else []
return APIKeyManager(api_keys, client_factory, service_name)

View File

@@ -0,0 +1,57 @@
"""
Formatters for web search results
"""
from typing import List, Dict, Any
def format_search_results(results: List[Dict[str, Any]]) -> str:
"""
格式化搜索结果为字符串
"""
if not results:
return "没有找到相关的网络信息。"
formatted_string = "根据网络搜索结果:\n\n"
for i, res in enumerate(results, 1):
title = res.get("title", '无标题')
url = res.get("url", '#')
snippet = res.get("snippet", '无摘要')
provider = res.get("provider", "未知来源")
formatted_string += f"{i}. **{title}** (来自: {provider})\n"
formatted_string += f" - 摘要: {snippet}\n"
formatted_string += f" - 来源: {url}\n\n"
return formatted_string
def format_url_parse_results(results: List[Dict[str, Any]]) -> str:
"""
将成功解析的URL结果列表格式化为一段简洁的文本。
"""
formatted_parts = []
for res in results:
title = res.get('title', '无标题')
url = res.get('url', '#')
snippet = res.get('snippet', '无摘要')
source = res.get('source', '未知')
formatted_string = f"**{title}**\n"
formatted_string += f"**内容摘要**:\n{snippet}\n"
formatted_string += f"**来源**: {url} (由 {source} 解析)\n"
formatted_parts.append(formatted_string)
return "\n---\n".join(formatted_parts)
def deduplicate_results(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
根据URL去重搜索结果
"""
unique_urls = set()
unique_results = []
for res in results:
if isinstance(res, dict) and res.get("url") and res["url"] not in unique_urls:
unique_urls.add(res["url"])
unique_results.append(res)
return unique_results

View File

@@ -0,0 +1,39 @@
"""
URL processing utilities
"""
import re
from typing import List
def parse_urls_from_input(urls_input) -> List[str]:
"""
从输入中解析URL列表
"""
if isinstance(urls_input, str):
# 如果是字符串尝试解析为URL列表
# 提取所有HTTP/HTTPS URL
url_pattern = r'https?://[^\s\],]+'
urls = re.findall(url_pattern, urls_input)
if not urls:
# 如果没有找到标准URL将整个字符串作为单个URL
if urls_input.strip().startswith(('http://', 'https://')):
urls = [urls_input.strip()]
else:
return []
elif isinstance(urls_input, list):
urls = [url.strip() for url in urls_input if isinstance(url, str) and url.strip()]
else:
return []
return urls
def validate_urls(urls: List[str]) -> List[str]:
"""
验证URL格式返回有效的URL列表
"""
valid_urls = []
for url in urls:
if url.startswith(('http://', 'https://')):
valid_urls.append(url)
return valid_urls