From fa353bf9d100c4d3e6707012b69ccb959bf58194 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Thu, 6 Nov 2025 13:11:54 +0800 Subject: [PATCH] =?UTF-8?q?feat(web=5Fsearch):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E7=AD=94=E6=A1=88=E6=A8=A1=E5=BC=8F=E6=94=AF=E6=8C=81=EF=BC=8C?= =?UTF-8?q?=E4=BC=98=E5=8C=96Exa=E6=90=9C=E7=B4=A2=E5=BC=95=E6=93=8E?= =?UTF-8?q?=E7=9A=84=E7=BB=93=E6=9E=9C=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../web_search_tool/engines/exa_engine.py | 98 +++++++++++++++++-- .../web_search_tool/tools/web_search.py | 47 +++++++-- 2 files changed, 126 insertions(+), 19 deletions(-) diff --git a/src/plugins/built_in/web_search_tool/engines/exa_engine.py b/src/plugins/built_in/web_search_tool/engines/exa_engine.py index 37655eb53..e09232249 100644 --- a/src/plugins/built_in/web_search_tool/engines/exa_engine.py +++ b/src/plugins/built_in/web_search_tool/engines/exa_engine.py @@ -39,7 +39,7 @@ class ExaSearchEngine(BaseSearchEngine): return self.api_manager.is_available() async def search(self, args: dict[str, Any]) -> list[dict[str, Any]]: - """执行Exa搜索""" + """执行优化的Exa搜索(使用answer模式)""" if not self.is_available(): return [] @@ -47,7 +47,16 @@ class ExaSearchEngine(BaseSearchEngine): num_results = args.get("num_results", 3) time_range = args.get("time_range", "any") - exa_args = {"num_results": num_results, "text": True, "highlights": True} + # 优化的搜索参数 - 更注重答案质量 + exa_args = { + "num_results": num_results, + "text": True, + "highlights": True, + "summary": True, # 启用自动摘要 + "include_text": True, # 包含全文内容 + } + + # 时间范围过滤 if time_range != "any": today = datetime.now() start_date = today - timedelta(days=7 if time_range == "week" else 30) @@ -61,18 +70,89 @@ class ExaSearchEngine(BaseSearchEngine): return [] loop = asyncio.get_running_loop() + # 使用search_and_contents获取完整内容,优化为answer模式 func = functools.partial(exa_client.search_and_contents, query, **exa_args) search_response = await loop.run_in_executor(None, func) - return [ - { + # 优化结果处理 - 更注重答案质量 + results = [] + for res in search_response.results: + # 获取最佳内容片段 + highlights = getattr(res, "highlights", []) + summary = getattr(res, "summary", "") + text = getattr(res, "text", "") + + # 智能内容选择:摘要 > 高亮 > 文本开头 + if summary and len(summary) > 50: + snippet = summary.strip() + elif highlights: + snippet = " ".join(highlights).strip() + elif text: + snippet = text[:300] + "..." if len(text) > 300 else text + else: + snippet = "内容获取失败" + + # 只保留有意义的摘要 + if len(snippet) < 30: + snippet = text[:200] + "..." if text and len(text) > 200 else snippet + + results.append({ "title": res.title, "url": res.url, - "snippet": " ".join(getattr(res, "highlights", [])) or (getattr(res, "text", "")[:250] + "..."), + "snippet": snippet, "provider": "Exa", - } - for res in search_response.results - ] + "answer_focused": True, # 标记为答案导向的搜索 + }) + + return results except Exception as e: - logger.error(f"Exa 搜索失败: {e}") + logger.error(f"Exa answer模式搜索失败: {e}") + return [] + + async def answer_search(self, args: dict[str, Any]) -> list[dict[str, Any]]: + """执行Exa快速答案搜索 - 最精简的搜索模式""" + if not self.is_available(): + return [] + + query = args["query"] + num_results = min(args.get("num_results", 2), 2) # 限制结果数量,专注质量 + + # 精简的搜索参数 - 专注快速答案 + exa_args = { + "num_results": num_results, + "text": False, # 不需要全文 + "highlights": True, # 只要关键高亮 + "summary": True, # 优先摘要 + } + + try: + exa_client = self.api_manager.get_next_client() + if not exa_client: + return [] + + loop = asyncio.get_running_loop() + func = functools.partial(exa_client.search_and_contents, query, **exa_args) + search_response = await loop.run_in_executor(None, func) + + # 极简结果处理 - 只保留最核心信息 + results = [] + for res in search_response.results: + summary = getattr(res, "summary", "") + highlights = getattr(res, "highlights", []) + + # 优先使用摘要,否则使用高亮 + answer_text = summary.strip() if summary and len(summary) > 30 else " ".join(highlights).strip() + + if answer_text and len(answer_text) > 20: + results.append({ + "title": res.title, + "url": res.url, + "snippet": answer_text[:400] + "..." if len(answer_text) > 400 else answer_text, + "provider": "Exa-Answer", + "answer_mode": True # 标记为纯答案模式 + }) + + return results + except Exception as e: + logger.error(f"Exa快速答案搜索失败: {e}") return [] diff --git a/src/plugins/built_in/web_search_tool/tools/web_search.py b/src/plugins/built_in/web_search_tool/tools/web_search.py index 466dae538..eaac1d7e1 100644 --- a/src/plugins/built_in/web_search_tool/tools/web_search.py +++ b/src/plugins/built_in/web_search_tool/tools/web_search.py @@ -41,6 +41,13 @@ class WebSurfingTool(BaseTool): False, ["any", "week", "month"], ), + ( + "answer_mode", + ToolParamType.BOOLEAN, + "是否启用答案模式(仅适用于Exa搜索引擎)。启用后将返回更精简、直接的答案,减少冗余信息。默认为False。", + False, + None, + ), ] # type: ignore def __init__(self, plugin_config=None, chat_stream=None): @@ -97,13 +104,19 @@ class WebSurfingTool(BaseTool): ) -> dict[str, Any]: """并行搜索策略:同时使用所有启用的搜索引擎""" search_tasks = [] + answer_mode = function_args.get("answer_mode", False) for engine_name in enabled_engines: engine = self.engines.get(engine_name) if engine and engine.is_available(): custom_args = function_args.copy() custom_args["num_results"] = custom_args.get("num_results", 5) - search_tasks.append(engine.search(custom_args)) + + # 如果启用了answer模式且是Exa引擎,使用answer_search方法 + if answer_mode and engine_name == "exa" and hasattr(engine, 'answer_search'): + search_tasks.append(engine.answer_search(custom_args)) + else: + search_tasks.append(engine.search(custom_args)) if not search_tasks: @@ -137,17 +150,23 @@ class WebSurfingTool(BaseTool): self, function_args: dict[str, Any], enabled_engines: list[str] ) -> dict[str, Any]: """回退搜索策略:按顺序尝试搜索引擎,失败则尝试下一个""" + answer_mode = function_args.get("answer_mode", False) + for engine_name in enabled_engines: engine = self.engines.get(engine_name) if not engine or not engine.is_available(): - continue try: custom_args = function_args.copy() custom_args["num_results"] = custom_args.get("num_results", 5) - results = await engine.search(custom_args) + # 如果启用了answer模式且是Exa引擎,使用answer_search方法 + if answer_mode and engine_name == "exa" and hasattr(engine, 'answer_search'): + logger.info("使用Exa答案模式进行搜索(fallback策略)") + results = await engine.answer_search(custom_args) + else: + results = await engine.search(custom_args) if results: # 如果有结果,直接返回 formatted_content = format_search_results(results) @@ -164,22 +183,30 @@ class WebSurfingTool(BaseTool): async def _execute_single_search(self, function_args: dict[str, Any], enabled_engines: list[str]) -> dict[str, Any]: """单一搜索策略:只使用第一个可用的搜索引擎""" + answer_mode = function_args.get("answer_mode", False) + for engine_name in enabled_engines: engine = self.engines.get(engine_name) if not engine or not engine.is_available(): - continue try: custom_args = function_args.copy() custom_args["num_results"] = custom_args.get("num_results", 5) - results = await engine.search(custom_args) - formatted_content = format_search_results(results) - return { - "type": "web_search_result", - "content": formatted_content, - } + # 如果启用了answer模式且是Exa引擎,使用answer_search方法 + if answer_mode and engine_name == "exa" and hasattr(engine, 'answer_search'): + logger.info("使用Exa答案模式进行搜索") + results = await engine.answer_search(custom_args) + else: + results = await engine.search(custom_args) + + if results: + formatted_content = format_search_results(results) + return { + "type": "web_search_result", + "content": formatted_content, + } except Exception as e: logger.error(f"{engine_name} 搜索失败: {e}")