refactor(chat): 重构主动思考模块以提升回复质量和逻辑清晰度(哪个大聪明把我联网搜索烦了)

将主动思考流程拆分为两个主要阶段:规划和内容生成。

在规划阶段(`ActionPlanner`),模型现在会结合最新的聊天上下文来决定是否发起主动对话,并确定一个合适的主题。这使得决策更加贴近当前对话氛围。

在内容生成阶段(`ProactiveThinker`),系统会围绕规划好的主题,主动搜集相关实时信息(如日程、网络资讯),并结合角色设定、心情和聊天历史,构建一个更丰富、更具上下文情境的提示词,从而生成更自然、更有趣的主动回复。

主要变更:
- `ActionPlanner` 在主动模式下增加对近期聊天记录的分析,决策更精准。
- `ProactiveThinker` 新增 `_generate_proactive_content_and_send` 方法,负责整合多源信息(日程、搜索、上下文)生成最终回复。
- 简化了 `ProactiveThinker` 的主逻辑,使其专注于执行 `proactive_reply` 动作,而非处理多种动作类型。
- 优化了相关提示词,使其更专注于生成高质量的主动对话内容。
This commit is contained in:
minecraft1024a
2025-09-06 19:42:48 +08:00
parent 657b12015b
commit 8c446e5490
17 changed files with 1432 additions and 28 deletions

View File

@@ -1,12 +1,19 @@
import time import time
import traceback import traceback
from typing import TYPE_CHECKING import orjson
from typing import TYPE_CHECKING, Dict, Any
from src.common.logger import get_logger from src.common.logger import get_logger
from src.plugin_system.base.component_types import ChatMode from src.plugin_system.base.component_types import ChatMode
from ..hfc_context import HfcContext from ..hfc_context import HfcContext
from .events import ProactiveTriggerEvent from .events import ProactiveTriggerEvent
from src.plugin_system.apis import generator_api from src.plugin_system.apis import generator_api
from src.schedule.schedule_manager import schedule_manager
from src.plugin_system import tool_api
from src.plugin_system.base.component_types import ComponentType
from src.config.config import global_config
from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat, build_readable_messages_with_id
from src.mood.mood_manager import mood_manager
if TYPE_CHECKING: if TYPE_CHECKING:
from ..cycle_processor import CycleProcessor from ..cycle_processor import CycleProcessor
@@ -20,6 +27,7 @@ class ProactiveThinker:
当接收到 ProactiveTriggerEvent 时,它会根据事件内容进行一系列决策和操作, 当接收到 ProactiveTriggerEvent 时,它会根据事件内容进行一系列决策和操作,
例如调整情绪、调用规划器生成行动,并最终可能产生一个主动的回复。 例如调整情绪、调用规划器生成行动,并最终可能产生一个主动的回复。
""" """
def __init__(self, context: HfcContext, cycle_processor: "CycleProcessor"): def __init__(self, context: HfcContext, cycle_processor: "CycleProcessor"):
""" """
初始化主动思考器。 初始化主动思考器。
@@ -75,9 +83,6 @@ class ProactiveThinker:
return return
try: try:
# 动态导入情绪管理器,避免循环依赖
from src.mood.mood_manager import mood_manager
# 获取当前聊天的情绪对象 # 获取当前聊天的情绪对象
mood_obj = mood_manager.get_mood_by_chat_id(self.context.stream_id) mood_obj = mood_manager.get_mood_by_chat_id(self.context.stream_id)
new_mood = None new_mood = None
@@ -112,29 +117,17 @@ class ProactiveThinker:
""" """
try: try:
# 调用规划器的 PROACTIVE 模式,让其决定下一步的行动 # 调用规划器的 PROACTIVE 模式,让其决定下一步的行动
actions, target_message = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE) actions, _ = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE)
# 通常只关心规划出的第一个动作 # 通常只关心规划出的第一个动作
action_result = actions[0] if actions else {} action_result = actions[0] if actions else {}
# 检查规划出的动作是否是“什么都不做” action_type = action_result.get("action_type")
if action_result and action_result.get("action_type") != "do_nothing":
# 如果动作是“回复” if action_type == "proactive_reply":
if action_result.get("action_type") == "reply": await self._generate_proactive_content_and_send(action_result)
# 调用生成器API来创建回复内容 elif action_type != "do_nothing":
success, response_set, _ = await generator_api.generate_reply( logger.warning(f"{self.context.log_prefix} 主动思考返回了未知的动作类型: {action_type}")
chat_stream=self.context.chat_stream,
reply_message=action_result["action_message"],
available_actions={}, # 主动回复不考虑工具使用
enable_tool=False,
request_type="chat.replyer.proactive", # 标记请求类型
from_plugin=False,
)
# 如果成功生成回复,则发送出去
if success and response_set:
await self.cycle_processor.response_handler.send_response(
response_set, time.time(), action_result["action_message"]
)
else: else:
# 如果规划结果是“什么都不做”,则记录日志 # 如果规划结果是“什么都不做”,则记录日志
logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默") logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默")
@@ -142,3 +135,98 @@ class ProactiveThinker:
except Exception as e: except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考执行异常: {e}") logger.error(f"{self.context.log_prefix} 主动思考执行异常: {e}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
async def _generate_proactive_content_and_send(self, action_result: Dict[str, Any]):
"""
获取实时信息,构建最终的生成提示词,并生成和发送主动回复。
Args:
action_result (Dict[str, Any]): 规划器返回的动作结果。
"""
try:
topic = action_result.get("action_data", {}).get("topic", "随便聊聊")
logger.info(f"{self.context.log_prefix} 主动思考确定主题: '{topic}'")
# 1. 获取日程信息
schedule_block = "你今天没有日程安排。"
if global_config.planning_system.schedule_enable:
if current_activity := schedule_manager.get_current_activity():
schedule_block = f"你当前正在:{current_activity}"
# 2. 网络搜索
news_block = "暂时没有获取到最新资讯。"
try:
web_search_tool = tool_api.get_tool_instance("web_search")
if web_search_tool:
tool_args = {"query": topic, "max_results": 10}
# 调用工具,并传递参数
search_result_dict = await web_search_tool.execute(**tool_args)
if search_result_dict and not search_result_dict.get("error"):
news_block = search_result_dict.get("content", "未能提取有效资讯。")
else:
logger.warning(f"{self.context.log_prefix} 网络搜索返回错误: {search_result_dict.get('error')}")
else:
logger.warning(f"{self.context.log_prefix} 未找到 web_search 工具实例。")
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考时网络搜索失败: {e}")
# 3. 获取最新的聊天上下文
message_list = get_raw_msg_before_timestamp_with_chat(
chat_id=self.context.stream_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.3),
)
chat_context_block, _ = build_readable_messages_with_id(messages=message_list)
# 4. 构建最终的生成提示词
bot_name = global_config.bot.nickname
identity_block = f"你的名字是{bot_name},你{global_config.personality.personality_core}"
mood_block = f"你现在的心情是:{mood_manager.get_mood_by_chat_id(self.context.stream_id).mood_state}"
final_prompt = f"""
# 主动对话生成
## 你的角色
{identity_block}
## 你的心情
{mood_block}
## 你今天的日程安排
{schedule_block}
## 关于你准备讨论的话题“{topic}”的最新信息
{news_block}
## 最近的聊天内容
{chat_context_block}
## 任务
你之前决定要发起一个关于“{topic}”的对话。现在,请结合以上所有信息,自然地开启这个话题。
## 要求
- 你的发言要听起来像是自发的,而不是在念报告。
- 巧妙地将日程安排或最新信息融入到你的开场白中。
- 风格要符合你的角色设定。
- 直接输出你想要说的内容,不要包含其他额外信息。
"""
# 5. 调用生成器API并发送
response_text = await generator_api.generate_response_custom(
chat_stream=self.context.chat_stream,
prompt=final_prompt,
request_type="chat.replyer.proactive",
)
if response_text:
# 将纯文本包装成 ResponseSet 格式
response_set = [{"type": "text", "data": {"text": response_text}}]
await self.cycle_processor.response_handler.send_response(
response_set, time.time(), action_result.get("action_message")
)
else:
logger.error(f"{self.context.log_prefix} 主动思考生成回复失败。")
except Exception as e:
logger.error(f"{self.context.log_prefix} 生成主动回复内容时异常: {e}")
logger.error(traceback.format_exc())

View File

@@ -90,12 +90,31 @@ def init_prompt():
## 长期记忆摘要 ## 长期记忆摘要
{long_term_memory_block} {long_term_memory_block}
## 最近的聊天内容
{chat_content_block}
## 任务 ## 任务
基于以上所有信息,分析当前情况,决定是否需要主动做些什么 基于以上所有信息(特别是最近的聊天内容),分析当前情况,决定是否适合主动开启一个**新的、但又与当前氛围相关**的话题
如果你认为不需要,就选择 'do_nothing'
## 可用动作 ## 可用动作
{action_options_text} 动作proactive_reply
动作描述:在当前对话的基础上,主动发起一个新的对话,分享一个有趣的想法、见闻或者对未来的计划。
- 当你觉得可以说些什么来活跃气氛,并且内容与当前聊天氛围不冲突时
- 当你有一些新的想法或计划想要分享,并且可以自然地衔接当前话题时
{{
"action": "proactive_reply",
"reason": "决定主动发起对话的具体原因",
"topic": "你想要发起对话的主题或内容(需要简洁)"
}}
动作do_nothing
动作描述:保持沉默,不主动发起任何动作或对话。
- 当你分析了所有信息后,觉得当前不是一个发起互动的好时机时
- 当最近的聊天内容很连贯,你的插入会打断别人时
{{
"action": "do_nothing",
"reason":"决定保持沉默的具体原因"
}}
你必须从上面列出的可用action中选择一个。 你必须从上面列出的可用action中选择一个。
请以严格的 JSON 格式输出,且仅包含 JSON 内容: 请以严格的 JSON 格式输出,且仅包含 JSON 内容:
@@ -643,7 +662,19 @@ class ActionPlanner:
# --- 根据模式构建不同的Prompt --- # --- 根据模式构建不同的Prompt ---
if mode == ChatMode.PROACTIVE: if mode == ChatMode.PROACTIVE:
long_term_memory_block = await self._get_long_term_memory_context() long_term_memory_block = await self._get_long_term_memory_context()
action_options_text = await self._build_action_options(current_available_actions, mode)
# 获取最近的聊天记录用于主动思考决策
message_list_short = get_raw_msg_before_timestamp_with_chat(
chat_id=self.chat_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.2), # 主动思考时只看少量最近消息
)
chat_content_block, _ = build_readable_messages_with_id(
messages=message_list_short,
timestamp_mode="normal",
truncate=False,
show_actions=False,
)
prompt_template = await global_prompt_manager.get_prompt_async("proactive_planner_prompt") prompt_template = await global_prompt_manager.get_prompt_async("proactive_planner_prompt")
prompt = prompt_template.format( prompt = prompt_template.format(
@@ -652,7 +683,7 @@ class ActionPlanner:
schedule_block=schedule_block, schedule_block=schedule_block,
mood_block=mood_block, mood_block=mood_block,
long_term_memory_block=long_term_memory_block, long_term_memory_block=long_term_memory_block,
action_options_text=action_options_text, chat_content_block=chat_content_block or "最近没有聊天内容。",
) )
return prompt, [] return prompt, []

View File

@@ -0,0 +1,25 @@
{
"manifest_version": 1,
"name": "web_search_tool",
"version": "1.0.0",
"description": "一个用于在互联网上搜索信息的工具",
"author": {
"name": "MoFox-Studio",
"url": "https://github.com/MoFox-Studio"
},
"license": "GPL-v3.0-or-later",
"host_application": {
"min_version": "0.10.0"
},
"keywords": ["web_search", "url_parser"],
"categories": ["web_search", "url_parser"],
"default_locale": "zh-CN",
"locales_path": "_locales",
"plugin_info": {
"is_built_in": false,
"plugin_type": "web_search"
}
}

View File

@@ -0,0 +1,3 @@
"""
Search engines package
"""

View File

@@ -0,0 +1,31 @@
"""
Base search engine interface
"""
from abc import ABC, abstractmethod
from typing import Dict, List, Any
class BaseSearchEngine(ABC):
"""
搜索引擎基类
"""
@abstractmethod
async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
执行搜索
Args:
args: 搜索参数,包含 query、num_results、time_range 等
Returns:
搜索结果列表,每个结果包含 title、url、snippet、provider 字段
"""
pass
@abstractmethod
def is_available(self) -> bool:
"""
检查搜索引擎是否可用
"""
pass

View File

@@ -0,0 +1,263 @@
"""
Bing search engine implementation
"""
import asyncio
import functools
import random
import traceback
from typing import Dict, List, Any
import requests
from bs4 import BeautifulSoup
from src.common.logger import get_logger
from .base import BaseSearchEngine
logger = get_logger("bing_engine")
ABSTRACT_MAX_LENGTH = 300 # abstract max length
user_agents = [
# Edge浏览器
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
# Chrome浏览器
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
# Firefox浏览器
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:123.0) Gecko/20100101 Firefox/123.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0",
]
# 请求头信息
HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Cache-Control": "max-age=0",
"Connection": "keep-alive",
"Host": "www.bing.com",
"Referer": "https://www.bing.com/",
"Sec-Ch-Ua": '"Chromium";v="122", "Microsoft Edge";v="122", "Not-A.Brand";v="99"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
}
bing_search_url = "https://www.bing.com/search?q="
class BingSearchEngine(BaseSearchEngine):
"""
Bing搜索引擎实现
"""
def __init__(self):
self.session = requests.Session()
self.session.headers = HEADERS
def is_available(self) -> bool:
"""检查Bing搜索引擎是否可用"""
return True # Bing是免费搜索引擎总是可用
async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
"""执行Bing搜索"""
query = args["query"]
num_results = args.get("num_results", 3)
time_range = args.get("time_range", "any")
try:
loop = asyncio.get_running_loop()
func = functools.partial(self._search_sync, query, num_results, time_range)
search_response = await loop.run_in_executor(None, func)
return search_response
except Exception as e:
logger.error(f"Bing 搜索失败: {e}")
return []
def _search_sync(self, keyword: str, num_results: int, time_range: str) -> List[Dict[str, Any]]:
"""同步执行Bing搜索"""
if not keyword:
return []
list_result = []
# 构建搜索URL
search_url = bing_search_url + keyword
# 如果指定了时间范围,添加时间过滤参数
if time_range == "week":
search_url += "&qft=+filterui:date-range-7"
elif time_range == "month":
search_url += "&qft=+filterui:date-range-30"
try:
data = self._parse_html(search_url)
if data:
list_result.extend(data)
logger.debug(f"Bing搜索 [{keyword}] 找到 {len(data)} 个结果")
except Exception as e:
logger.error(f"Bing搜索解析失败: {e}")
return []
logger.debug(f"Bing搜索 [{keyword}] 完成,总共 {len(list_result)} 个结果")
return list_result[:num_results] if len(list_result) > num_results else list_result
def _parse_html(self, url: str) -> List[Dict[str, Any]]:
"""解析处理结果"""
try:
logger.debug(f"访问Bing搜索URL: {url}")
# 设置必要的Cookie
cookies = {
"SRCHHPGUSR": "SRCHLANG=zh-Hans", # 设置默认搜索语言为中文
"SRCHD": "AF=NOFORM",
"SRCHUID": "V=2&GUID=1A4D4F1C8844493F9A2E3DB0D1BC806C",
"_SS": "SID=0D89D9A3C95C60B62E7AC80CC85461B3",
"_EDGE_S": "ui=zh-cn", # 设置界面语言为中文
"_EDGE_V": "1",
}
# 为每次请求随机选择不同的用户代理,降低被屏蔽风险
headers = HEADERS.copy()
headers["User-Agent"] = random.choice(user_agents)
# 创建新的session
session = requests.Session()
session.headers.update(headers)
session.cookies.update(cookies)
# 发送请求
try:
res = session.get(url=url, timeout=(3.05, 6), verify=True, allow_redirects=True)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
logger.warning(f"第一次请求超时,正在重试: {str(e)}")
try:
res = session.get(url=url, timeout=(5, 10), verify=False)
except Exception as e2:
logger.error(f"第二次请求也失败: {str(e2)}")
return []
res.encoding = "utf-8"
# 检查响应状态
if res.status_code == 403:
logger.error("被禁止访问 (403 Forbidden)可能是IP被限制")
return []
if res.status_code != 200:
logger.error(f"必应搜索请求失败,状态码: {res.status_code}")
return []
# 检查是否被重定向到登录页面或验证页面
if "login.live.com" in res.url or "login.microsoftonline.com" in res.url:
logger.error("被重定向到登录页面,可能需要登录")
return []
if "https://www.bing.com/ck/a" in res.url:
logger.error("被重定向到验证页面,可能被识别为机器人")
return []
# 解析HTML
try:
root = BeautifulSoup(res.text, "lxml")
except Exception:
try:
root = BeautifulSoup(res.text, "html.parser")
except Exception as e:
logger.error(f"HTML解析失败: {str(e)}")
return []
list_data = []
# 尝试提取搜索结果
# 方法1: 查找标准的搜索结果容器
results = root.select("ol#b_results li.b_algo")
if results:
for _rank, result in enumerate(results, 1):
# 提取标题和链接
title_link = result.select_one("h2 a")
if not title_link:
continue
title = title_link.get_text().strip()
url = title_link.get("href", "")
# 提取摘要
abstract = ""
abstract_elem = result.select_one("div.b_caption p")
if abstract_elem:
abstract = abstract_elem.get_text().strip()
# 限制摘要长度
if ABSTRACT_MAX_LENGTH and len(abstract) > ABSTRACT_MAX_LENGTH:
abstract = abstract[:ABSTRACT_MAX_LENGTH] + "..."
list_data.append({
"title": title,
"url": url,
"snippet": abstract,
"provider": "Bing"
})
if len(list_data) >= 10: # 限制结果数量
break
# 方法2: 如果标准方法没找到结果,使用备用方法
if not list_data:
# 查找所有可能的搜索结果链接
all_links = root.find_all("a")
for link in all_links:
href = link.get("href", "")
text = link.get_text().strip()
# 过滤有效的搜索结果链接
if (href and text and len(text) > 10
and not href.startswith("javascript:")
and not href.startswith("#")
and "http" in href
and not any(x in href for x in [
"bing.com/search", "bing.com/images", "bing.com/videos",
"bing.com/maps", "bing.com/news", "login", "account",
"microsoft", "javascript"
])):
# 尝试获取摘要
abstract = ""
parent = link.parent
if parent and parent.get_text():
full_text = parent.get_text().strip()
if len(full_text) > len(text):
abstract = full_text.replace(text, "", 1).strip()
# 限制摘要长度
if ABSTRACT_MAX_LENGTH and len(abstract) > ABSTRACT_MAX_LENGTH:
abstract = abstract[:ABSTRACT_MAX_LENGTH] + "..."
list_data.append({
"title": text,
"url": href,
"snippet": abstract,
"provider": "Bing"
})
if len(list_data) >= 10:
break
logger.debug(f"从Bing解析到 {len(list_data)} 个搜索结果")
return list_data
except Exception as e:
logger.error(f"解析Bing页面时出错: {str(e)}")
logger.debug(traceback.format_exc())
return []

View File

@@ -0,0 +1,42 @@
"""
DuckDuckGo search engine implementation
"""
from typing import Dict, List, Any
from asyncddgs import aDDGS
from src.common.logger import get_logger
from .base import BaseSearchEngine
logger = get_logger("ddg_engine")
class DDGSearchEngine(BaseSearchEngine):
"""
DuckDuckGo搜索引擎实现
"""
def is_available(self) -> bool:
"""检查DuckDuckGo搜索引擎是否可用"""
return True # DuckDuckGo不需要API密钥总是可用
async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
"""执行DuckDuckGo搜索"""
query = args["query"]
num_results = args.get("num_results", 3)
try:
async with aDDGS() as ddgs:
search_response = await ddgs.text(query, max_results=num_results)
return [
{
"title": r.get("title"),
"url": r.get("href"),
"snippet": r.get("body"),
"provider": "DuckDuckGo"
}
for r in search_response
]
except Exception as e:
logger.error(f"DuckDuckGo 搜索失败: {e}")
return []

View File

@@ -0,0 +1,79 @@
"""
Exa search engine implementation
"""
import asyncio
import functools
from datetime import datetime, timedelta
from typing import Dict, List, Any
from exa_py import Exa
from src.common.logger import get_logger
from src.plugin_system.apis import config_api
from .base import BaseSearchEngine
from ..utils.api_key_manager import create_api_key_manager_from_config
logger = get_logger("exa_engine")
class ExaSearchEngine(BaseSearchEngine):
"""
Exa搜索引擎实现
"""
def __init__(self):
self._initialize_clients()
def _initialize_clients(self):
"""初始化Exa客户端"""
# 从主配置文件读取API密钥
exa_api_keys = config_api.get_global_config("exa.api_keys", None)
# 创建API密钥管理器
self.api_manager = create_api_key_manager_from_config(
exa_api_keys,
lambda key: Exa(api_key=key),
"Exa"
)
def is_available(self) -> bool:
"""检查Exa搜索引擎是否可用"""
return self.api_manager.is_available()
async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
"""执行Exa搜索"""
if not self.is_available():
return []
query = args["query"]
num_results = args.get("num_results", 3)
time_range = args.get("time_range", "any")
exa_args = {"num_results": num_results, "text": True, "highlights": True}
if time_range != "any":
today = datetime.now()
start_date = today - timedelta(days=7 if time_range == "week" else 30)
exa_args["start_published_date"] = start_date.strftime('%Y-%m-%d')
try:
# 使用API密钥管理器获取下一个客户端
exa_client = self.api_manager.get_next_client()
if not exa_client:
logger.error("无法获取Exa客户端")
return []
loop = asyncio.get_running_loop()
func = functools.partial(exa_client.search_and_contents, query, **exa_args)
search_response = await loop.run_in_executor(None, func)
return [
{
"title": res.title,
"url": res.url,
"snippet": " ".join(getattr(res, 'highlights', [])) or (getattr(res, 'text', '')[:250] + '...'),
"provider": "Exa"
}
for res in search_response.results
]
except Exception as e:
logger.error(f"Exa 搜索失败: {e}")
return []

View File

@@ -0,0 +1,90 @@
"""
Tavily search engine implementation
"""
import asyncio
import functools
from typing import Dict, List, Any
from tavily import TavilyClient
from src.common.logger import get_logger
from src.plugin_system.apis import config_api
from .base import BaseSearchEngine
from ..utils.api_key_manager import create_api_key_manager_from_config
logger = get_logger("tavily_engine")
class TavilySearchEngine(BaseSearchEngine):
"""
Tavily搜索引擎实现
"""
def __init__(self):
self._initialize_clients()
def _initialize_clients(self):
"""初始化Tavily客户端"""
# 从主配置文件读取API密钥
tavily_api_keys = config_api.get_global_config("tavily.api_keys", None)
# 创建API密钥管理器
self.api_manager = create_api_key_manager_from_config(
tavily_api_keys,
lambda key: TavilyClient(api_key=key),
"Tavily"
)
def is_available(self) -> bool:
"""检查Tavily搜索引擎是否可用"""
return self.api_manager.is_available()
async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
"""执行Tavily搜索"""
if not self.is_available():
return []
query = args["query"]
num_results = args.get("num_results", 3)
time_range = args.get("time_range", "any")
try:
# 使用API密钥管理器获取下一个客户端
tavily_client = self.api_manager.get_next_client()
if not tavily_client:
logger.error("无法获取Tavily客户端")
return []
# 构建Tavily搜索参数
search_params = {
"query": query,
"max_results": num_results,
"search_depth": "basic",
"include_answer": False,
"include_raw_content": False
}
# 根据时间范围调整搜索参数
if time_range == "week":
search_params["days"] = 7
elif time_range == "month":
search_params["days"] = 30
loop = asyncio.get_running_loop()
func = functools.partial(tavily_client.search, **search_params)
search_response = await loop.run_in_executor(None, func)
results = []
if search_response and "results" in search_response:
for res in search_response["results"]:
results.append({
"title": res.get("title", "无标题"),
"url": res.get("url", ""),
"snippet": res.get("content", "")[:300] + "..." if res.get("content") else "无摘要",
"provider": "Tavily"
})
return results
except Exception as e:
logger.error(f"Tavily 搜索失败: {e}")
return []

View File

@@ -0,0 +1,160 @@
"""
Web Search Tool Plugin
一个功能强大的网络搜索和URL解析插件支持多种搜索引擎和解析策略。
"""
from typing import List, Tuple, Type
from src.plugin_system import (
BasePlugin,
register_plugin,
ComponentInfo,
ConfigField,
PythonDependency
)
from src.plugin_system.apis import config_api
from src.common.logger import get_logger
from .tools.web_search import WebSurfingTool
from .tools.url_parser import URLParserTool
logger = get_logger("web_search_plugin")
@register_plugin
class WEBSEARCHPLUGIN(BasePlugin):
"""
网络搜索工具插件
提供网络搜索和URL解析功能支持多种搜索引擎
- Exa (需要API密钥)
- Tavily (需要API密钥)
- DuckDuckGo (免费)
- Bing (免费)
"""
# 插件基本信息
plugin_name: str = "web_search_tool" # 内部标识符
enable_plugin: bool = True
dependencies: List[str] = [] # 插件依赖列表
def __init__(self, *args, **kwargs):
"""初始化插件,立即加载所有搜索引擎"""
super().__init__(*args, **kwargs)
# 立即初始化所有搜索引擎触发API密钥管理器的日志输出
logger.info("🚀 正在初始化所有搜索引擎...")
try:
from .engines.exa_engine import ExaSearchEngine
from .engines.tavily_engine import TavilySearchEngine
from .engines.ddg_engine import DDGSearchEngine
from .engines.bing_engine import BingSearchEngine
# 实例化所有搜索引擎这会触发API密钥管理器的初始化
exa_engine = ExaSearchEngine()
tavily_engine = TavilySearchEngine()
ddg_engine = DDGSearchEngine()
bing_engine = BingSearchEngine()
# 报告每个引擎的状态
engines_status = {
"Exa": exa_engine.is_available(),
"Tavily": tavily_engine.is_available(),
"DuckDuckGo": ddg_engine.is_available(),
"Bing": bing_engine.is_available()
}
available_engines = [name for name, available in engines_status.items() if available]
unavailable_engines = [name for name, available in engines_status.items() if not available]
if available_engines:
logger.info(f"✅ 可用搜索引擎: {', '.join(available_engines)}")
if unavailable_engines:
logger.info(f"❌ 不可用搜索引擎: {', '.join(unavailable_engines)}")
except Exception as e:
logger.error(f"❌ 搜索引擎初始化失败: {e}", exc_info=True)
# Python包依赖列表
python_dependencies: List[PythonDependency] = [
PythonDependency(
package_name="asyncddgs",
description="异步DuckDuckGo搜索库",
optional=False
),
PythonDependency(
package_name="exa_py",
description="Exa搜索API客户端库",
optional=True # 如果没有API密钥这个是可选的
),
PythonDependency(
package_name="tavily",
install_name="tavily-python", # 安装时使用这个名称
description="Tavily搜索API客户端库",
optional=True # 如果没有API密钥这个是可选的
),
PythonDependency(
package_name="httpx",
version=">=0.20.0",
install_name="httpx[socks]", # 安装时使用这个名称(包含可选依赖)
description="支持SOCKS代理的HTTP客户端库",
optional=False
)
]
config_file_name: str = "config.toml" # 配置文件名
# 配置节描述
config_section_descriptions = {
"plugin": "插件基本信息",
"proxy": "链接本地解析代理配置"
}
# 配置Schema定义
# 注意EXA配置和组件设置已迁移到主配置文件(bot_config.toml)的[exa]和[web_search]部分
config_schema: dict = {
"plugin": {
"name": ConfigField(type=str, default="WEB_SEARCH_PLUGIN", description="插件名称"),
"version": ConfigField(type=str, default="1.0.0", description="插件版本"),
"enabled": ConfigField(type=bool, default=False, description="是否启用插件"),
},
"proxy": {
"http_proxy": ConfigField(
type=str,
default=None,
description="HTTP代理地址格式如: http://proxy.example.com:8080"
),
"https_proxy": ConfigField(
type=str,
default=None,
description="HTTPS代理地址格式如: http://proxy.example.com:8080"
),
"socks5_proxy": ConfigField(
type=str,
default=None,
description="SOCKS5代理地址格式如: socks5://proxy.example.com:1080"
),
"enable_proxy": ConfigField(
type=bool,
default=False,
description="是否启用代理"
)
},
}
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
"""
获取插件组件列表
Returns:
组件信息和类型的元组列表
"""
enable_tool = []
# 从主配置文件读取组件启用配置
if config_api.get_global_config("web_search.enable_web_search_tool", True):
enable_tool.append((WebSurfingTool.get_tool_info(), WebSurfingTool))
if config_api.get_global_config("web_search.enable_url_tool", True):
enable_tool.append((URLParserTool.get_tool_info(), URLParserTool))
return enable_tool

View File

@@ -0,0 +1,3 @@
"""
Tools package
"""

View File

@@ -0,0 +1,242 @@
"""
URL parser tool implementation
"""
import asyncio
import functools
from typing import Any, Dict
from exa_py import Exa
import httpx
from bs4 import BeautifulSoup
from src.common.logger import get_logger
from src.plugin_system import BaseTool, ToolParamType, llm_api
from src.plugin_system.apis import config_api
from src.common.cache_manager import tool_cache
from ..utils.formatters import format_url_parse_results
from ..utils.url_utils import parse_urls_from_input, validate_urls
from ..utils.api_key_manager import create_api_key_manager_from_config
logger = get_logger("url_parser_tool")
class URLParserTool(BaseTool):
"""
一个用于解析和总结一个或多个网页URL内容的工具。
"""
name: str = "parse_url"
description: str = "当需要理解一个或多个特定网页链接的内容时,使用此工具。例如:'这些网页讲了什么?[https://example.com, https://example2.com]''帮我总结一下这些文章'"
available_for_llm: bool = True
parameters = [
("urls", ToolParamType.STRING, "要理解的网站", True, None),
]
def __init__(self, plugin_config=None):
super().__init__(plugin_config)
self._initialize_exa_clients()
def _initialize_exa_clients(self):
"""初始化Exa客户端"""
# 优先从主配置文件读取,如果没有则从插件配置文件读取
exa_api_keys = config_api.get_global_config("exa.api_keys", None)
if exa_api_keys is None:
# 从插件配置文件读取
exa_api_keys = self.get_config("exa.api_keys", [])
# 创建API密钥管理器
self.api_manager = create_api_key_manager_from_config(
exa_api_keys,
lambda key: Exa(api_key=key),
"Exa URL Parser"
)
async def _local_parse_and_summarize(self, url: str) -> Dict[str, Any]:
"""
使用本地库(httpx, BeautifulSoup)解析URL并调用LLM进行总结。
"""
try:
# 读取代理配置
enable_proxy = self.get_config("proxy.enable_proxy", False)
proxies = None
if enable_proxy:
socks5_proxy = self.get_config("proxy.socks5_proxy", None)
http_proxy = self.get_config("proxy.http_proxy", None)
https_proxy = self.get_config("proxy.https_proxy", None)
# 优先使用SOCKS5代理全协议代理
if socks5_proxy:
proxies = socks5_proxy
logger.info(f"使用SOCKS5代理: {socks5_proxy}")
elif http_proxy or https_proxy:
proxies = {}
if http_proxy:
proxies["http://"] = http_proxy
if https_proxy:
proxies["https://"] = https_proxy
logger.info(f"使用HTTP/HTTPS代理配置: {proxies}")
client_kwargs = {"timeout": 15.0, "follow_redirects": True}
if proxies:
client_kwargs["proxies"] = proxies
async with httpx.AsyncClient(**client_kwargs) as client:
response = await client.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
title = soup.title.string if soup.title else "无标题"
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text(separator="\n", strip=True)
if not text:
return {"error": "无法从页面提取有效文本内容。"}
summary_prompt = f"请根据以下网页内容生成一段不超过300字的中文摘要保留核心信息和关键点:\n\n---\n\n标题: {title}\n\n内容:\n{text[:4000]}\n\n---\n\n摘要:"
text_model = str(self.get_config("models.text_model", "replyer_1"))
models = llm_api.get_available_models()
model_config = models.get(text_model)
if not model_config:
logger.error("未配置LLM模型")
return {"error": "未配置LLM模型"}
success, summary, reasoning, model_name = await llm_api.generate_with_model(
prompt=summary_prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if not success:
logger.info(f"生成摘要失败: {summary}")
return {"error": "发生ai错误"}
logger.info(f"成功生成摘要内容:'{summary}'")
return {
"title": title,
"url": url,
"snippet": summary,
"source": "local"
}
except httpx.HTTPStatusError as e:
logger.warning(f"本地解析URL '{url}' 失败 (HTTP {e.response.status_code})")
return {"error": f"请求失败,状态码: {e.response.status_code}"}
except Exception as e:
logger.error(f"本地解析或总结URL '{url}' 时发生未知异常: {e}", exc_info=True)
return {"error": f"发生未知错误: {str(e)}"}
async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]:
"""
执行URL内容提取和总结。优先使用Exa失败后尝试本地解析。
"""
# 获取当前文件路径用于缓存键
import os
current_file_path = os.path.abspath(__file__)
# 检查缓存
cached_result = await tool_cache.get(self.name, function_args, current_file_path)
if cached_result:
logger.info(f"缓存命中: {self.name} -> {function_args}")
return cached_result
urls_input = function_args.get("urls")
if not urls_input:
return {"error": "URL列表不能为空。"}
# 处理URL输入确保是列表格式
urls = parse_urls_from_input(urls_input)
if not urls:
return {"error": "提供的字符串中未找到有效的URL。"}
# 验证URL格式
valid_urls = validate_urls(urls)
if not valid_urls:
return {"error": "未找到有效的URL。"}
urls = valid_urls
logger.info(f"准备解析 {len(urls)} 个URL: {urls}")
successful_results = []
error_messages = []
urls_to_retry_locally = []
# 步骤 1: 尝试使用 Exa API 进行解析
contents_response = None
if self.api_manager.is_available():
logger.info(f"开始使用 Exa API 解析URL: {urls}")
try:
# 使用API密钥管理器获取下一个客户端
exa_client = self.api_manager.get_next_client()
if not exa_client:
logger.error("无法获取Exa客户端")
else:
loop = asyncio.get_running_loop()
exa_params = {"text": True, "summary": True, "highlights": True}
func = functools.partial(exa_client.get_contents, urls, **exa_params)
contents_response = await loop.run_in_executor(None, func)
except Exception as e:
logger.error(f"执行 Exa URL解析时发生严重异常: {e}", exc_info=True)
contents_response = None # 确保异常后为None
# 步骤 2: 处理Exa的响应
if contents_response and hasattr(contents_response, 'statuses'):
results_map = {res.url: res for res in contents_response.results} if hasattr(contents_response, 'results') else {}
if contents_response.statuses:
for status in contents_response.statuses:
if status.status == 'success':
res = results_map.get(status.id)
if res:
summary = getattr(res, 'summary', '')
highlights = " ".join(getattr(res, 'highlights', []))
text_snippet = (getattr(res, 'text', '')[:300] + '...') if getattr(res, 'text', '') else ''
snippet = summary or highlights or text_snippet or '无摘要'
successful_results.append({
"title": getattr(res, 'title', '无标题'),
"url": getattr(res, 'url', status.id),
"snippet": snippet,
"source": "exa"
})
else:
error_tag = getattr(status, 'error', '未知错误')
logger.warning(f"Exa解析URL '{status.id}' 失败: {error_tag}。准备本地重试。")
urls_to_retry_locally.append(status.id)
else:
# 如果Exa未配置、API调用失败或返回无效响应则所有URL都进入本地重试
urls_to_retry_locally.extend(url for url in urls if url not in [res['url'] for res in successful_results])
# 步骤 3: 对失败的URL进行本地解析
if urls_to_retry_locally:
logger.info(f"开始本地解析以下URL: {urls_to_retry_locally}")
local_tasks = [self._local_parse_and_summarize(url) for url in urls_to_retry_locally]
local_results = await asyncio.gather(*local_tasks)
for i, res in enumerate(local_results):
url = urls_to_retry_locally[i]
if "error" in res:
error_messages.append(f"URL: {url} - 解析失败: {res['error']}")
else:
successful_results.append(res)
if not successful_results:
return {"error": "无法从所有给定的URL获取内容。", "details": error_messages}
formatted_content = format_url_parse_results(successful_results)
result = {
"type": "url_parse_result",
"content": formatted_content,
"errors": error_messages
}
# 保存到缓存
if "error" not in result:
await tool_cache.set(self.name, function_args, current_file_path, result)
return result

View File

@@ -0,0 +1,164 @@
"""
Web search tool implementation
"""
import asyncio
from typing import Any, Dict, List
from src.common.logger import get_logger
from src.plugin_system import BaseTool, ToolParamType
from src.plugin_system.apis import config_api
from src.common.cache_manager import tool_cache
from ..engines.exa_engine import ExaSearchEngine
from ..engines.tavily_engine import TavilySearchEngine
from ..engines.ddg_engine import DDGSearchEngine
from ..engines.bing_engine import BingSearchEngine
from ..utils.formatters import format_search_results, deduplicate_results
logger = get_logger("web_search_tool")
class WebSurfingTool(BaseTool):
"""
网络搜索工具
"""
name: str = "web_search"
description: str = "用于执行网络搜索。当用户明确要求搜索,或者需要获取关于公司、产品、事件的最新信息、新闻或动态时,必须使用此工具"
available_for_llm: bool = True
parameters = [
("query", ToolParamType.STRING, "要搜索的关键词或问题。", True, None),
("num_results", ToolParamType.INTEGER, "期望每个搜索引擎返回的搜索结果数量默认为5。", False, None),
("time_range", ToolParamType.STRING, "指定搜索的时间范围,可以是 'any', 'week', 'month'。默认为 'any'", False, ["any", "week", "month"])
] # type: ignore
def __init__(self, plugin_config=None):
super().__init__(plugin_config)
# 初始化搜索引擎
self.engines = {
"exa": ExaSearchEngine(),
"tavily": TavilySearchEngine(),
"ddg": DDGSearchEngine(),
"bing": BingSearchEngine()
}
async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]:
query = function_args.get("query")
if not query:
return {"error": "搜索查询不能为空。"}
# 获取当前文件路径用于缓存键
import os
current_file_path = os.path.abspath(__file__)
# 检查缓存
cached_result = await tool_cache.get(self.name, function_args, current_file_path, semantic_query=query)
if cached_result:
logger.info(f"缓存命中: {self.name} -> {function_args}")
return cached_result
# 读取搜索配置
enabled_engines = config_api.get_global_config("web_search.enabled_engines", ["ddg"])
search_strategy = config_api.get_global_config("web_search.search_strategy", "single")
logger.info(f"开始搜索,策略: {search_strategy}, 启用引擎: {enabled_engines}, 参数: '{function_args}'")
# 根据策略执行搜索
if search_strategy == "parallel":
result = await self._execute_parallel_search(function_args, enabled_engines)
elif search_strategy == "fallback":
result = await self._execute_fallback_search(function_args, enabled_engines)
else: # single
result = await self._execute_single_search(function_args, enabled_engines)
# 保存到缓存
if "error" not in result:
await tool_cache.set(self.name, function_args, current_file_path, result, semantic_query=query)
return result
async def _execute_parallel_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""并行搜索策略:同时使用所有启用的搜索引擎"""
search_tasks = []
for engine_name in enabled_engines:
engine = self.engines.get(engine_name)
if engine and engine.is_available():
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
search_tasks.append(engine.search(custom_args))
if not search_tasks:
return {"error": "没有可用的搜索引擎。"}
try:
search_results_lists = await asyncio.gather(*search_tasks, return_exceptions=True)
all_results = []
for result in search_results_lists:
if isinstance(result, list):
all_results.extend(result)
elif isinstance(result, Exception):
logger.error(f"搜索时发生错误: {result}")
# 去重并格式化
unique_results = deduplicate_results(all_results)
formatted_content = format_search_results(unique_results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.error(f"执行并行网络搜索时发生异常: {e}", exc_info=True)
return {"error": f"执行网络搜索时发生严重错误: {str(e)}"}
async def _execute_fallback_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""回退搜索策略:按顺序尝试搜索引擎,失败则尝试下一个"""
for engine_name in enabled_engines:
engine = self.engines.get(engine_name)
if not engine or not engine.is_available():
continue
try:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
results = await engine.search(custom_args)
if results: # 如果有结果,直接返回
formatted_content = format_search_results(results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.warning(f"{engine_name} 搜索失败,尝试下一个引擎: {e}")
continue
return {"error": "所有搜索引擎都失败了。"}
async def _execute_single_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""单一搜索策略:只使用第一个可用的搜索引擎"""
for engine_name in enabled_engines:
engine = self.engines.get(engine_name)
if not engine or not engine.is_available():
continue
try:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
results = await engine.search(custom_args)
formatted_content = format_search_results(results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.error(f"{engine_name} 搜索失败: {e}")
return {"error": f"{engine_name} 搜索失败: {str(e)}"}
return {"error": "没有可用的搜索引擎。"}

View File

@@ -0,0 +1,3 @@
"""
Web search tool utilities package
"""

View File

@@ -0,0 +1,84 @@
"""
API密钥管理器提供轮询机制
"""
import itertools
from typing import List, Optional, TypeVar, Generic, Callable
from src.common.logger import get_logger
logger = get_logger("api_key_manager")
T = TypeVar('T')
class APIKeyManager(Generic[T]):
"""
API密钥管理器支持轮询机制
"""
def __init__(self, api_keys: List[str], client_factory: Callable[[str], T], service_name: str = "Unknown"):
"""
初始化API密钥管理器
Args:
api_keys: API密钥列表
client_factory: 客户端工厂函数接受API密钥参数并返回客户端实例
service_name: 服务名称,用于日志记录
"""
self.service_name = service_name
self.clients: List[T] = []
self.client_cycle: Optional[itertools.cycle] = None
if api_keys:
# 过滤有效的API密钥排除None、空字符串、"None"字符串等
valid_keys = []
for key in api_keys:
if isinstance(key, str) and key.strip() and key.strip().lower() not in ("none", "null", ""):
valid_keys.append(key.strip())
if valid_keys:
try:
self.clients = [client_factory(key) for key in valid_keys]
self.client_cycle = itertools.cycle(self.clients)
logger.info(f"🔑 {service_name} 成功加载 {len(valid_keys)} 个 API 密钥")
except Exception as e:
logger.error(f"❌ 初始化 {service_name} 客户端失败: {e}")
self.clients = []
self.client_cycle = None
else:
logger.warning(f"⚠️ {service_name} API Keys 配置无效包含None或空值{service_name} 功能将不可用")
else:
logger.warning(f"⚠️ {service_name} API Keys 未配置,{service_name} 功能将不可用")
def is_available(self) -> bool:
"""检查是否有可用的客户端"""
return bool(self.clients and self.client_cycle)
def get_next_client(self) -> Optional[T]:
"""获取下一个客户端(轮询)"""
if not self.is_available():
return None
return next(self.client_cycle)
def get_client_count(self) -> int:
"""获取可用客户端数量"""
return len(self.clients)
def create_api_key_manager_from_config(
config_keys: Optional[List[str]],
client_factory: Callable[[str], T],
service_name: str
) -> APIKeyManager[T]:
"""
从配置创建API密钥管理器的便捷函数
Args:
config_keys: 从配置读取的API密钥列表
client_factory: 客户端工厂函数
service_name: 服务名称
Returns:
API密钥管理器实例
"""
api_keys = config_keys if isinstance(config_keys, list) else []
return APIKeyManager(api_keys, client_factory, service_name)

View File

@@ -0,0 +1,57 @@
"""
Formatters for web search results
"""
from typing import List, Dict, Any
def format_search_results(results: List[Dict[str, Any]]) -> str:
"""
格式化搜索结果为字符串
"""
if not results:
return "没有找到相关的网络信息。"
formatted_string = "根据网络搜索结果:\n\n"
for i, res in enumerate(results, 1):
title = res.get("title", '无标题')
url = res.get("url", '#')
snippet = res.get("snippet", '无摘要')
provider = res.get("provider", "未知来源")
formatted_string += f"{i}. **{title}** (来自: {provider})\n"
formatted_string += f" - 摘要: {snippet}\n"
formatted_string += f" - 来源: {url}\n\n"
return formatted_string
def format_url_parse_results(results: List[Dict[str, Any]]) -> str:
"""
将成功解析的URL结果列表格式化为一段简洁的文本。
"""
formatted_parts = []
for res in results:
title = res.get('title', '无标题')
url = res.get('url', '#')
snippet = res.get('snippet', '无摘要')
source = res.get('source', '未知')
formatted_string = f"**{title}**\n"
formatted_string += f"**内容摘要**:\n{snippet}\n"
formatted_string += f"**来源**: {url} (由 {source} 解析)\n"
formatted_parts.append(formatted_string)
return "\n---\n".join(formatted_parts)
def deduplicate_results(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
根据URL去重搜索结果
"""
unique_urls = set()
unique_results = []
for res in results:
if isinstance(res, dict) and res.get("url") and res["url"] not in unique_urls:
unique_urls.add(res["url"])
unique_results.append(res)
return unique_results

View File

@@ -0,0 +1,39 @@
"""
URL processing utilities
"""
import re
from typing import List
def parse_urls_from_input(urls_input) -> List[str]:
"""
从输入中解析URL列表
"""
if isinstance(urls_input, str):
# 如果是字符串尝试解析为URL列表
# 提取所有HTTP/HTTPS URL
url_pattern = r'https?://[^\s\],]+'
urls = re.findall(url_pattern, urls_input)
if not urls:
# 如果没有找到标准URL将整个字符串作为单个URL
if urls_input.strip().startswith(('http://', 'https://')):
urls = [urls_input.strip()]
else:
return []
elif isinstance(urls_input, list):
urls = [url.strip() for url in urls_input if isinstance(url, str) and url.strip()]
else:
return []
return urls
def validate_urls(urls: List[str]) -> List[str]:
"""
验证URL格式返回有效的URL列表
"""
valid_urls = []
for url in urls:
if url.startswith(('http://', 'https://')):
valid_urls.append(url)
return valid_urls