feat"搜索重构,增加轮询等多种模式"

你知道吗?其实雅诺狐狐的耳朵很软很好rua
This commit is contained in:
雅诺狐
2025-08-13 18:35:45 +08:00
parent 678e2a89f6
commit 631af5d6fd
2 changed files with 45 additions and 234 deletions

View File

@@ -1,11 +1,9 @@
import asyncio import asyncio
import functools import functools
import itertools
from typing import Any, Dict, List from typing import Any, Dict, List
from datetime import datetime, timedelta from datetime import datetime, timedelta
from exa_py import Exa from exa_py import Exa
from asyncddgs import aDDGS from asyncddgs import aDDGS
from tavily import TavilyClient
from src.common.logger import get_logger from src.common.logger import get_logger
from typing import Tuple,Type from typing import Tuple,Type
@@ -38,89 +36,28 @@ class WebSurfingTool(BaseTool):
def __init__(self, plugin_config=None): def __init__(self, plugin_config=None):
super().__init__(plugin_config) super().__init__(plugin_config)
# 从主配置文件读取EXA API密钥
# 初始化EXA API密钥轮询器 EXA_API_KEY = config_api.get_global_config("exa.api_key", None)
self.exa_clients = [] # 确保API key是字符串类型
self.exa_key_cycle = None if EXA_API_KEY and isinstance(EXA_API_KEY, str) and EXA_API_KEY.strip() != "None":
self.exa = Exa(api_key=str(EXA_API_KEY).strip())
# 优先从主配置文件读取,如果没有则从插件配置文件读取
EXA_API_KEYS = config_api.get_global_config("exa.api_keys", None)
if EXA_API_KEYS is None:
# 从插件配置文件读取
EXA_API_KEYS = self.get_config("exa.api_keys", [])
if isinstance(EXA_API_KEYS, list) and EXA_API_KEYS:
valid_keys = [key.strip() for key in EXA_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")]
if valid_keys:
self.exa_clients = [Exa(api_key=key) for key in valid_keys]
self.exa_key_cycle = itertools.cycle(self.exa_clients)
logger.info(f"已配置 {len(valid_keys)} 个 Exa API 密钥")
else:
logger.warning("Exa API Keys 配置无效Exa 搜索功能将不可用。")
else: else:
logger.warning("Exa API Keys 未配置Exa 搜索功能将不可用。") self.exa = None
# 初始化Tavily API密钥轮询器 if not self.exa:
self.tavily_clients = [] logger.warning("Exa API Key 未配置Exa 搜索功能将不可用。")
self.tavily_key_cycle = None
# 优先从主配置文件读取,如果没有则从插件配置文件读取
TAVILY_API_KEYS = config_api.get_global_config("tavily.api_keys", None)
if TAVILY_API_KEYS is None:
# 从插件配置文件读取
TAVILY_API_KEYS = self.get_config("tavily.api_keys", [])
if isinstance(TAVILY_API_KEYS, list) and TAVILY_API_KEYS:
valid_keys = [key.strip() for key in TAVILY_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")]
if valid_keys:
self.tavily_clients = [TavilyClient(api_key=key) for key in valid_keys]
self.tavily_key_cycle = itertools.cycle(self.tavily_clients)
logger.info(f"已配置 {len(valid_keys)} 个 Tavily API 密钥")
else:
logger.warning("Tavily API Keys 配置无效Tavily 搜索功能将不可用。")
else:
logger.warning("Tavily API Keys 未配置Tavily 搜索功能将不可用。")
async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]: async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]:
query = function_args.get("query") query = function_args.get("query")
if not query: if not query:
return {"error": "搜索查询不能为空。"} return {"error": "搜索查询不能为空。"}
# 读取搜索配置 logger.info(f"开始并行搜索,参数: '{function_args}'")
enabled_engines = config_api.get_global_config("web_search.enabled_engines", ["ddg"])
search_strategy = config_api.get_global_config("web_search.search_strategy", "single")
logger.info(f"开始搜索,策略: {search_strategy}, 启用引擎: {enabled_engines}, 参数: '{function_args}'")
# 根据策略执行搜索
if search_strategy == "parallel":
return await self._execute_parallel_search(function_args, enabled_engines)
elif search_strategy == "fallback":
return await self._execute_fallback_search(function_args, enabled_engines)
else: # single
return await self._execute_single_search(function_args, enabled_engines)
async def _execute_parallel_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""并行搜索策略:同时使用所有启用的搜索引擎"""
search_tasks = [] search_tasks = []
if self.exa:
for engine in enabled_engines: search_tasks.append(self._search_exa(function_args))
if engine == "exa" and self.exa_clients: search_tasks.append(self._search_ddg(function_args))
# 使用参数中的数量如果没有则默认5个
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
search_tasks.append(self._search_exa(custom_args))
elif engine == "tavily" and self.tavily_clients:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
search_tasks.append(self._search_tavily(custom_args))
elif engine == "ddg":
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
search_tasks.append(self._search_ddg(custom_args))
if not search_tasks:
return {"error": "没有可用的搜索引擎。"}
try: try:
search_results_lists = await asyncio.gather(*search_tasks, return_exceptions=True) search_results_lists = await asyncio.gather(*search_tasks, return_exceptions=True)
@@ -136,72 +73,17 @@ class WebSurfingTool(BaseTool):
unique_results = self._deduplicate_results(all_results) unique_results = self._deduplicate_results(all_results)
formatted_content = self._format_results(unique_results) formatted_content = self._format_results(unique_results)
return { result_package = {
"type": "web_search_result", "type": "web_search_result",
"content": formatted_content, "content": formatted_content,
} }
return result_package
except Exception as e: except Exception as e:
logger.error(f"执行并行网络搜索时发生异常: {e}", exc_info=True) logger.error(f"执行并行网络搜索时发生异常: {e}", exc_info=True)
return {"error": f"执行网络搜索时发生严重错误: {str(e)}"} return {"error": f"执行网络搜索时发生严重错误: {str(e)}"}
async def _execute_fallback_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""回退搜索策略:按顺序尝试搜索引擎,失败则尝试下一个"""
for engine in enabled_engines:
try:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
if engine == "exa" and self.exa_clients:
results = await self._search_exa(custom_args)
elif engine == "tavily" and self.tavily_clients:
results = await self._search_tavily(custom_args)
elif engine == "ddg":
results = await self._search_ddg(custom_args)
else:
continue
if results: # 如果有结果,直接返回
formatted_content = self._format_results(results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.warning(f"{engine} 搜索失败,尝试下一个引擎: {e}")
continue
return {"error": "所有搜索引擎都失败了。"}
async def _execute_single_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""单一搜索策略:只使用第一个可用的搜索引擎"""
for engine in enabled_engines:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
try:
if engine == "exa" and self.exa_clients:
results = await self._search_exa(custom_args)
elif engine == "tavily" and self.tavily_clients:
results = await self._search_tavily(custom_args)
elif engine == "ddg":
results = await self._search_ddg(custom_args)
else:
continue
formatted_content = self._format_results(results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.error(f"{engine} 搜索失败: {e}")
return {"error": f"{engine} 搜索失败: {str(e)}"}
return {"error": "没有可用的搜索引擎。"}
def _deduplicate_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: def _deduplicate_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
unique_urls = set() unique_urls = set()
unique_results = [] unique_results = []
@@ -223,13 +105,10 @@ class WebSurfingTool(BaseTool):
exa_args["start_published_date"] = start_date.strftime('%Y-%m-%d') exa_args["start_published_date"] = start_date.strftime('%Y-%m-%d')
try: try:
if not self.exa_key_cycle: if not self.exa:
return [] return []
# 使用轮询机制获取下一个客户端
exa_client = next(self.exa_key_cycle)
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
func = functools.partial(exa_client.search_and_contents, query, **exa_args) func = functools.partial(self.exa.search_and_contents, query, **exa_args)
search_response = await loop.run_in_executor(None, func) search_response = await loop.run_in_executor(None, func)
return [ return [
@@ -245,53 +124,6 @@ class WebSurfingTool(BaseTool):
logger.error(f"Exa 搜索失败: {e}") logger.error(f"Exa 搜索失败: {e}")
return [] return []
async def _search_tavily(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
query = args["query"]
num_results = args.get("num_results", 3)
time_range = args.get("time_range", "any")
try:
if not self.tavily_key_cycle:
return []
# 使用轮询机制获取下一个客户端
tavily_client = next(self.tavily_key_cycle)
# 构建Tavily搜索参数
search_params = {
"query": query,
"max_results": num_results,
"search_depth": "basic",
"include_answer": False,
"include_raw_content": False
}
# 根据时间范围调整搜索参数
if time_range == "week":
search_params["days"] = 7
elif time_range == "month":
search_params["days"] = 30
loop = asyncio.get_running_loop()
func = functools.partial(tavily_client.search, **search_params)
search_response = await loop.run_in_executor(None, func)
results = []
if search_response and "results" in search_response:
for res in search_response["results"]:
results.append({
"title": res.get("title", "无标题"),
"url": res.get("url", ""),
"snippet": res.get("content", "")[:300] + "..." if res.get("content") else "无摘要",
"provider": "Tavily"
})
return results
except Exception as e:
logger.error(f"Tavily 搜索失败: {e}")
return []
async def _search_ddg(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: async def _search_ddg(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
query = args["query"] query = args["query"]
num_results = args.get("num_results", 3) num_results = args.get("num_results", 3)
@@ -342,27 +174,16 @@ class URLParserTool(BaseTool):
] ]
def __init__(self, plugin_config=None): def __init__(self, plugin_config=None):
super().__init__(plugin_config) super().__init__(plugin_config)
# 从主配置文件读取EXA API密钥
# 初始化EXA API密钥轮询器 EXA_API_KEY = config_api.get_global_config("exa.api_key", None)
self.exa_clients = [] # 确保API key是字符串类型
self.exa_key_cycle = None if (not EXA_API_KEY or
not isinstance(EXA_API_KEY, str) or
# 优先从主配置文件读取,如果没有则从插件配置文件读取 EXA_API_KEY.strip() in ("YOUR_API_KEY_HERE", "None", "")):
EXA_API_KEYS = config_api.get_global_config("exa.api_keys", None) self.exa = None
if EXA_API_KEYS is None: logger.error("Exa API Key 未配置URL解析功能将受限。")
# 从插件配置文件读取
EXA_API_KEYS = self.get_config("exa.api_keys", [])
if isinstance(EXA_API_KEYS, list) and EXA_API_KEYS:
valid_keys = [key.strip() for key in EXA_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")]
if valid_keys:
self.exa_clients = [Exa(api_key=key) for key in valid_keys]
self.exa_key_cycle = itertools.cycle(self.exa_clients)
logger.info(f"URL解析工具已配置 {len(valid_keys)} 个 Exa API 密钥")
else:
logger.warning("Exa API Keys 配置无效URL解析功能将受限。")
else: else:
logger.warning("Exa API Keys 未配置URL解析功能将受限。") self.exa = Exa(api_key=str(EXA_API_KEY).strip())
async def _local_parse_and_summarize(self, url: str) -> Dict[str, Any]: async def _local_parse_and_summarize(self, url: str) -> Dict[str, Any]:
""" """
使用本地库(httpx, BeautifulSoup)解析URL并调用LLM进行总结。 使用本地库(httpx, BeautifulSoup)解析URL并调用LLM进行总结。
@@ -491,14 +312,12 @@ class URLParserTool(BaseTool):
# 步骤 1: 尝试使用 Exa API 进行解析 # 步骤 1: 尝试使用 Exa API 进行解析
contents_response = None contents_response = None
if self.exa_key_cycle: if self.exa:
logger.info(f"开始使用 Exa API 解析URL: {urls}") logger.info(f"开始使用 Exa API 解析URL: {urls}")
try: try:
# 使用轮询机制获取下一个客户端
exa_client = next(self.exa_key_cycle)
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
exa_params = {"text": True, "summary": True, "highlights": True} exa_params = {"text": True, "summary": True, "highlights": True}
func = functools.partial(exa_client.get_contents, urls, **exa_params) func = functools.partial(self.exa.get_contents, urls, **exa_params)
contents_response = await loop.run_in_executor(None, func) contents_response = await loop.run_in_executor(None, func)
except Exception as e: except Exception as e:
logger.error(f"执行 Exa URL解析时发生严重异常: {e}", exc_info=True) logger.error(f"执行 Exa URL解析时发生严重异常: {e}", exc_info=True)
@@ -599,11 +418,6 @@ class WEBSEARCHPLUGIN(BasePlugin):
description="Exa搜索API客户端库", description="Exa搜索API客户端库",
optional=True # 如果没有API密钥这个是可选的 optional=True # 如果没有API密钥这个是可选的
), ),
PythonDependency(
package_name="tavily-python",
description="Tavily搜索API客户端库",
optional=True # 如果没有API密钥这个是可选的
),
PythonDependency( PythonDependency(
package_name="httpx", package_name="httpx",
version=">=0.20.0", version=">=0.20.0",
@@ -644,8 +458,8 @@ class WEBSEARCHPLUGIN(BasePlugin):
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
enable_tool =[] enable_tool =[]
# 从主配置文件读取组件启用配置 # 从主配置文件读取组件启用配置
if config_api.get_global_config("web_search.enable_web_search_tool", True): if config_api.get_global_config.web_search.enable_web_search_tool:
enable_tool.append((WebSurfingTool.get_tool_info(), WebSurfingTool)) enable_tool.append((WebSurfingTool.get_tool_info(), WebSurfingTool))
if config_api.get_global_config("web_search.enable_url_tool", True): if config_api.get_global_config.web_search.enable_url_tool:
enable_tool.append((URLParserTool.get_tool_info(), URLParserTool)) enable_tool.append((URLParserTool.get_tool_info(), URLParserTool))
return enable_tool return enable_tool

View File

@@ -1,5 +1,5 @@
[inner] [inner]
version = "6.3.0" version = "6.2.9"
#----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读---- #----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
#如果你想要修改配置文件请递增version的值 #如果你想要修改配置文件请递增version的值
@@ -11,7 +11,8 @@ version = "6.3.0"
# 修订号:配置文件内容小更新 # 修订号:配置文件内容小更新
#----以上是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读---- #----以上是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
[database]# 数据库配置 [database]
# 数据库配置
database_type = "sqlite" # 数据库类型,支持 "sqlite" 或 "mysql" database_type = "sqlite" # 数据库类型,支持 "sqlite" 或 "mysql"
# SQLite 配置(当 database_type = "sqlite" 时使用) # SQLite 配置(当 database_type = "sqlite" 时使用)
@@ -152,7 +153,7 @@ ban_msgs_regex = [
willing_mode = "classical" # 回复意愿模式 —— 经典模式classicalmxp模式mxp自定义模式custom需要你自己实现 willing_mode = "classical" # 回复意愿模式 —— 经典模式classicalmxp模式mxp自定义模式custom需要你自己实现
[tool] [tool]
enable_tool = true # 是否在普通聊天中启用工具 enable_tool = false # 是否在普通聊天中启用工具
[mood] [mood]
enable_mood = true # 是否启用情绪系统 enable_mood = true # 是否启用情绪系统
@@ -325,18 +326,14 @@ batch_analysis_prompt = """请分析这个视频的内容。这些图片是从
请用中文回答,分析要详细准确。""" 请用中文回答,分析要详细准确。"""
# EXA搜索引擎配置
[exa]
# EXA API密钥用于联网搜索功能
api_key = "None" # 请填入有效的EXA API密钥
[exa] # EXA API密钥列表支持轮询机制 # 联网搜索组件配置
api_keys = ["None"]
[tavily] # Tavily API密钥列表支持轮询机制
api_keys = ["None"]
# 网络搜索组件配置
[web_search] [web_search]
enable_web_search_tool = true # 是否启用联网搜索tool # 是否启用联网搜索工具
enable_url_tool = true # 是否启用URL解析tool enable_web_search_tool = true
# 是否启用URL解析工具
# 搜索引擎配置 enable_url_tool = true
enabled_engines = ["ddg"] # 启用的搜索引擎列表,可选: "exa", "tavily", "ddg"
search_strategy = "single" # 搜索策略: "single"(使用第一个可用引擎), "parallel"(并行使用所有启用的引擎), "fallback"(按顺序尝试,失败则尝试下一个)