搜索重构,增加轮询等多种模式

你知道吗?其实雅诺狐狐的耳朵很软很好rua
This commit is contained in:
雅诺狐
2025-08-13 18:32:18 +08:00
parent 69483173e6
commit 678e2a89f6
2 changed files with 232 additions and 43 deletions

View File

@@ -1,9 +1,11 @@
import asyncio
import functools
import itertools
from typing import Any, Dict, List
from datetime import datetime, timedelta
from exa_py import Exa
from asyncddgs import aDDGS
from tavily import TavilyClient
from src.common.logger import get_logger
from typing import Tuple,Type
@@ -31,33 +33,94 @@ class WebSurfingTool(BaseTool):
parameters = [
("query", ToolParamType.STRING, "要搜索的关键词或问题。", True, None),
("num_results", ToolParamType.INTEGER, "期望每个搜索引擎返回的搜索结果数量默认为5。", False, None),
("time_range", ToolParamType.STRING, "指定搜索的时间范围,可以是 'any', 'week', 'month'。默认为 'any'", False, ["any", "week", "month"])
("time_range", ToolParamType.STRING, "指定搜索的时间范围,可以是 'any', 'week', 'month'。默认为 'any'", False, ["any", "week", "month"])
] # type: ignore
def __init__(self, plugin_config=None):
super().__init__(plugin_config)
# 从主配置文件读取EXA API密钥
EXA_API_KEY = config_api.get_global_config("exa.api_key", None)
# 确保API key是字符串类型
if EXA_API_KEY and isinstance(EXA_API_KEY, str) and EXA_API_KEY.strip() != "None":
self.exa = Exa(api_key=str(EXA_API_KEY).strip())
# 初始化EXA API密钥轮询器
self.exa_clients = []
self.exa_key_cycle = None
# 优先从主配置文件读取,如果没有则从插件配置文件读取
EXA_API_KEYS = config_api.get_global_config("exa.api_keys", None)
if EXA_API_KEYS is None:
# 从插件配置文件读取
EXA_API_KEYS = self.get_config("exa.api_keys", [])
if isinstance(EXA_API_KEYS, list) and EXA_API_KEYS:
valid_keys = [key.strip() for key in EXA_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")]
if valid_keys:
self.exa_clients = [Exa(api_key=key) for key in valid_keys]
self.exa_key_cycle = itertools.cycle(self.exa_clients)
logger.info(f"已配置 {len(valid_keys)} 个 Exa API 密钥")
else:
logger.warning("Exa API Keys 配置无效Exa 搜索功能将不可用。")
else:
self.exa = None
logger.warning("Exa API Keys 未配置Exa 搜索功能将不可用。")
if not self.exa:
logger.warning("Exa API Key 未配置Exa 搜索功能将不可用。")
# 初始化Tavily API密钥轮询器
self.tavily_clients = []
self.tavily_key_cycle = None
# 优先从主配置文件读取,如果没有则从插件配置文件读取
TAVILY_API_KEYS = config_api.get_global_config("tavily.api_keys", None)
if TAVILY_API_KEYS is None:
# 从插件配置文件读取
TAVILY_API_KEYS = self.get_config("tavily.api_keys", [])
if isinstance(TAVILY_API_KEYS, list) and TAVILY_API_KEYS:
valid_keys = [key.strip() for key in TAVILY_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")]
if valid_keys:
self.tavily_clients = [TavilyClient(api_key=key) for key in valid_keys]
self.tavily_key_cycle = itertools.cycle(self.tavily_clients)
logger.info(f"已配置 {len(valid_keys)} 个 Tavily API 密钥")
else:
logger.warning("Tavily API Keys 配置无效Tavily 搜索功能将不可用。")
else:
logger.warning("Tavily API Keys 未配置Tavily 搜索功能将不可用。")
async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]:
query = function_args.get("query")
if not query:
return {"error": "搜索查询不能为空。"}
logger.info(f"开始并行搜索,参数: '{function_args}'")
# 读取搜索配置
enabled_engines = config_api.get_global_config("web_search.enabled_engines", ["ddg"])
search_strategy = config_api.get_global_config("web_search.search_strategy", "single")
logger.info(f"开始搜索,策略: {search_strategy}, 启用引擎: {enabled_engines}, 参数: '{function_args}'")
# 根据策略执行搜索
if search_strategy == "parallel":
return await self._execute_parallel_search(function_args, enabled_engines)
elif search_strategy == "fallback":
return await self._execute_fallback_search(function_args, enabled_engines)
else: # single
return await self._execute_single_search(function_args, enabled_engines)
async def _execute_parallel_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""并行搜索策略:同时使用所有启用的搜索引擎"""
search_tasks = []
if self.exa:
search_tasks.append(self._search_exa(function_args))
search_tasks.append(self._search_ddg(function_args))
for engine in enabled_engines:
if engine == "exa" and self.exa_clients:
# 使用参数中的数量如果没有则默认5个
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
search_tasks.append(self._search_exa(custom_args))
elif engine == "tavily" and self.tavily_clients:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
search_tasks.append(self._search_tavily(custom_args))
elif engine == "ddg":
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
search_tasks.append(self._search_ddg(custom_args))
if not search_tasks:
return {"error": "没有可用的搜索引擎。"}
try:
search_results_lists = await asyncio.gather(*search_tasks, return_exceptions=True)
@@ -73,17 +136,72 @@ class WebSurfingTool(BaseTool):
unique_results = self._deduplicate_results(all_results)
formatted_content = self._format_results(unique_results)
result_package = {
return {
"type": "web_search_result",
"content": formatted_content,
}
return result_package
except Exception as e:
logger.error(f"执行并行网络搜索时发生异常: {e}", exc_info=True)
return {"error": f"执行网络搜索时发生严重错误: {str(e)}"}
async def _execute_fallback_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""回退搜索策略:按顺序尝试搜索引擎,失败则尝试下一个"""
for engine in enabled_engines:
try:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
if engine == "exa" and self.exa_clients:
results = await self._search_exa(custom_args)
elif engine == "tavily" and self.tavily_clients:
results = await self._search_tavily(custom_args)
elif engine == "ddg":
results = await self._search_ddg(custom_args)
else:
continue
if results: # 如果有结果,直接返回
formatted_content = self._format_results(results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.warning(f"{engine} 搜索失败,尝试下一个引擎: {e}")
continue
return {"error": "所有搜索引擎都失败了。"}
async def _execute_single_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""单一搜索策略:只使用第一个可用的搜索引擎"""
for engine in enabled_engines:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
try:
if engine == "exa" and self.exa_clients:
results = await self._search_exa(custom_args)
elif engine == "tavily" and self.tavily_clients:
results = await self._search_tavily(custom_args)
elif engine == "ddg":
results = await self._search_ddg(custom_args)
else:
continue
formatted_content = self._format_results(results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.error(f"{engine} 搜索失败: {e}")
return {"error": f"{engine} 搜索失败: {str(e)}"}
return {"error": "没有可用的搜索引擎。"}
def _deduplicate_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
unique_urls = set()
unique_results = []
@@ -105,10 +223,13 @@ class WebSurfingTool(BaseTool):
exa_args["start_published_date"] = start_date.strftime('%Y-%m-%d')
try:
if not self.exa:
if not self.exa_key_cycle:
return []
# 使用轮询机制获取下一个客户端
exa_client = next(self.exa_key_cycle)
loop = asyncio.get_running_loop()
func = functools.partial(self.exa.search_and_contents, query, **exa_args)
func = functools.partial(exa_client.search_and_contents, query, **exa_args)
search_response = await loop.run_in_executor(None, func)
return [
@@ -124,6 +245,53 @@ class WebSurfingTool(BaseTool):
logger.error(f"Exa 搜索失败: {e}")
return []
async def _search_tavily(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
query = args["query"]
num_results = args.get("num_results", 3)
time_range = args.get("time_range", "any")
try:
if not self.tavily_key_cycle:
return []
# 使用轮询机制获取下一个客户端
tavily_client = next(self.tavily_key_cycle)
# 构建Tavily搜索参数
search_params = {
"query": query,
"max_results": num_results,
"search_depth": "basic",
"include_answer": False,
"include_raw_content": False
}
# 根据时间范围调整搜索参数
if time_range == "week":
search_params["days"] = 7
elif time_range == "month":
search_params["days"] = 30
loop = asyncio.get_running_loop()
func = functools.partial(tavily_client.search, **search_params)
search_response = await loop.run_in_executor(None, func)
results = []
if search_response and "results" in search_response:
for res in search_response["results"]:
results.append({
"title": res.get("title", "无标题"),
"url": res.get("url", ""),
"snippet": res.get("content", "")[:300] + "..." if res.get("content") else "无摘要",
"provider": "Tavily"
})
return results
except Exception as e:
logger.error(f"Tavily 搜索失败: {e}")
return []
async def _search_ddg(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
query = args["query"]
num_results = args.get("num_results", 3)
@@ -174,16 +342,27 @@ class URLParserTool(BaseTool):
]
def __init__(self, plugin_config=None):
super().__init__(plugin_config)
# 从主配置文件读取EXA API密钥
EXA_API_KEY = config_api.get_global_config("exa.api_key", None)
# 确保API key是字符串类型
if (not EXA_API_KEY or
not isinstance(EXA_API_KEY, str) or
EXA_API_KEY.strip() in ("YOUR_API_KEY_HERE", "None", "")):
self.exa = None
logger.error("Exa API Key 未配置URL解析功能将受限。")
# 初始化EXA API密钥轮询器
self.exa_clients = []
self.exa_key_cycle = None
# 优先从主配置文件读取,如果没有则从插件配置文件读取
EXA_API_KEYS = config_api.get_global_config("exa.api_keys", None)
if EXA_API_KEYS is None:
# 从插件配置文件读取
EXA_API_KEYS = self.get_config("exa.api_keys", [])
if isinstance(EXA_API_KEYS, list) and EXA_API_KEYS:
valid_keys = [key.strip() for key in EXA_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")]
if valid_keys:
self.exa_clients = [Exa(api_key=key) for key in valid_keys]
self.exa_key_cycle = itertools.cycle(self.exa_clients)
logger.info(f"URL解析工具已配置 {len(valid_keys)} 个 Exa API 密钥")
else:
logger.warning("Exa API Keys 配置无效URL解析功能将受限。")
else:
self.exa = Exa(api_key=str(EXA_API_KEY).strip())
logger.warning("Exa API Keys 未配置URL解析功能将受限。")
async def _local_parse_and_summarize(self, url: str) -> Dict[str, Any]:
"""
使用本地库(httpx, BeautifulSoup)解析URL并调用LLM进行总结。
@@ -312,12 +491,14 @@ class URLParserTool(BaseTool):
# 步骤 1: 尝试使用 Exa API 进行解析
contents_response = None
if self.exa:
if self.exa_key_cycle:
logger.info(f"开始使用 Exa API 解析URL: {urls}")
try:
# 使用轮询机制获取下一个客户端
exa_client = next(self.exa_key_cycle)
loop = asyncio.get_running_loop()
exa_params = {"text": True, "summary": True, "highlights": True}
func = functools.partial(self.exa.get_contents, urls, **exa_params)
func = functools.partial(exa_client.get_contents, urls, **exa_params)
contents_response = await loop.run_in_executor(None, func)
except Exception as e:
logger.error(f"执行 Exa URL解析时发生严重异常: {e}", exc_info=True)
@@ -414,10 +595,15 @@ class WEBSEARCHPLUGIN(BasePlugin):
optional=False
),
PythonDependency(
package_name="exa_py",
package_name="exa_py",
description="Exa搜索API客户端库",
optional=True # 如果没有API密钥这个是可选的
),
PythonDependency(
package_name="tavily-python",
description="Tavily搜索API客户端库",
optional=True # 如果没有API密钥这个是可选的
),
PythonDependency(
package_name="httpx",
version=">=0.20.0",

View File

@@ -1,5 +1,5 @@
[inner]
version = "6.2.9"
version = "6.3.0"
#----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
#如果你想要修改配置文件请递增version的值
@@ -11,8 +11,7 @@ version = "6.2.9"
# 修订号:配置文件内容小更新
#----以上是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
[database]
# 数据库配置
[database]# 数据库配置
database_type = "sqlite" # 数据库类型,支持 "sqlite" 或 "mysql"
# SQLite 配置(当 database_type = "sqlite" 时使用)
@@ -153,7 +152,7 @@ ban_msgs_regex = [
willing_mode = "classical" # 回复意愿模式 —— 经典模式classicalmxp模式mxp自定义模式custom需要你自己实现
[tool]
enable_tool = false # 是否在普通聊天中启用工具
enable_tool = true # 是否在普通聊天中启用工具
[mood]
enable_mood = true # 是否启用情绪系统
@@ -326,14 +325,18 @@ batch_analysis_prompt = """请分析这个视频的内容。这些图片是从
请用中文回答,分析要详细准确。"""
# EXA搜索引擎配置
[exa]
# EXA API密钥用于联网搜索功能
api_key = "None" # 请填入有效的EXA API密钥
# 联网搜索组件配置
[exa] # EXA API密钥列表支持轮询机制
api_keys = ["None"]
[tavily] # Tavily API密钥列表支持轮询机制
api_keys = ["None"]
# 网络搜索组件配置
[web_search]
# 是否启用联网搜索工具
enable_web_search_tool = true
# 是否启用URL解析工具
enable_url_tool = true
enable_web_search_tool = true # 是否启用联网搜索tool
enable_url_tool = true # 是否启用URL解析tool
# 搜索引擎配置
enabled_engines = ["ddg"] # 启用的搜索引擎列表,可选: "exa", "tavily", "ddg"
search_strategy = "single" # 搜索策略: "single"(使用第一个可用引擎), "parallel"(并行使用所有启用的引擎), "fallback"(按顺序尝试,失败则尝试下一个)