From 8c446e54903ebaa5851b778a2ea12bac80a3c38f Mon Sep 17 00:00:00 2001 From: minecraft1024a Date: Sat, 6 Sep 2025 19:42:48 +0800 Subject: [PATCH 1/4] =?UTF-8?q?refactor(chat):=20=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E4=B8=BB=E5=8A=A8=E6=80=9D=E8=80=83=E6=A8=A1=E5=9D=97=E4=BB=A5?= =?UTF-8?q?=E6=8F=90=E5=8D=87=E5=9B=9E=E5=A4=8D=E8=B4=A8=E9=87=8F=E5=92=8C?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E6=B8=85=E6=99=B0=E5=BA=A6=EF=BC=88=E5=93=AA?= =?UTF-8?q?=E4=B8=AA=E5=A4=A7=E8=81=AA=E6=98=8E=E6=8A=8A=E6=88=91=E8=81=94?= =?UTF-8?q?=E7=BD=91=E6=90=9C=E7=B4=A2=E7=83=A6=E4=BA=86=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将主动思考流程拆分为两个主要阶段:规划和内容生成。 在规划阶段(`ActionPlanner`),模型现在会结合最新的聊天上下文来决定是否发起主动对话,并确定一个合适的主题。这使得决策更加贴近当前对话氛围。 在内容生成阶段(`ProactiveThinker`),系统会围绕规划好的主题,主动搜集相关实时信息(如日程、网络资讯),并结合角色设定、心情和聊天历史,构建一个更丰富、更具上下文情境的提示词,从而生成更自然、更有趣的主动回复。 主要变更: - `ActionPlanner` 在主动模式下增加对近期聊天记录的分析,决策更精准。 - `ProactiveThinker` 新增 `_generate_proactive_content_and_send` 方法,负责整合多源信息(日程、搜索、上下文)生成最终回复。 - 简化了 `ProactiveThinker` 的主逻辑,使其专注于执行 `proactive_reply` 动作,而非处理多种动作类型。 - 优化了相关提示词,使其更专注于生成高质量的主动对话内容。 --- .../chat_loop/proactive/proactive_thinker.py | 134 +++++++-- src/chat/planner_actions/planner.py | 41 ++- .../built_in/web_search_tool/_manifest.json | 25 ++ .../web_search_tool/engines/__init__.py | 3 + .../built_in/web_search_tool/engines/base.py | 31 +++ .../web_search_tool/engines/bing_engine.py | 263 ++++++++++++++++++ .../web_search_tool/engines/ddg_engine.py | 42 +++ .../web_search_tool/engines/exa_engine.py | 79 ++++++ .../web_search_tool/engines/tavily_engine.py | 90 ++++++ .../built_in/web_search_tool/plugin.py | 160 +++++++++++ .../web_search_tool/tools/__init__.py | 3 + .../web_search_tool/tools/url_parser.py | 242 ++++++++++++++++ .../web_search_tool/tools/web_search.py | 164 +++++++++++ .../web_search_tool/utils/__init__.py | 3 + .../web_search_tool/utils/api_key_manager.py | 84 ++++++ .../web_search_tool/utils/formatters.py | 57 ++++ .../web_search_tool/utils/url_utils.py | 39 +++ 17 files changed, 1432 insertions(+), 28 deletions(-) create mode 100644 src/plugins/built_in/web_search_tool/_manifest.json create mode 100644 src/plugins/built_in/web_search_tool/engines/__init__.py create mode 100644 src/plugins/built_in/web_search_tool/engines/base.py create mode 100644 src/plugins/built_in/web_search_tool/engines/bing_engine.py create mode 100644 src/plugins/built_in/web_search_tool/engines/ddg_engine.py create mode 100644 src/plugins/built_in/web_search_tool/engines/exa_engine.py create mode 100644 src/plugins/built_in/web_search_tool/engines/tavily_engine.py create mode 100644 src/plugins/built_in/web_search_tool/plugin.py create mode 100644 src/plugins/built_in/web_search_tool/tools/__init__.py create mode 100644 src/plugins/built_in/web_search_tool/tools/url_parser.py create mode 100644 src/plugins/built_in/web_search_tool/tools/web_search.py create mode 100644 src/plugins/built_in/web_search_tool/utils/__init__.py create mode 100644 src/plugins/built_in/web_search_tool/utils/api_key_manager.py create mode 100644 src/plugins/built_in/web_search_tool/utils/formatters.py create mode 100644 src/plugins/built_in/web_search_tool/utils/url_utils.py diff --git a/src/chat/chat_loop/proactive/proactive_thinker.py b/src/chat/chat_loop/proactive/proactive_thinker.py index 081941b18..2ee715a4c 100644 --- a/src/chat/chat_loop/proactive/proactive_thinker.py +++ b/src/chat/chat_loop/proactive/proactive_thinker.py @@ -1,12 +1,19 @@ import time import traceback -from typing import TYPE_CHECKING +import orjson +from typing import TYPE_CHECKING, Dict, Any from src.common.logger import get_logger from src.plugin_system.base.component_types import ChatMode from ..hfc_context import HfcContext from .events import ProactiveTriggerEvent from src.plugin_system.apis import generator_api +from src.schedule.schedule_manager import schedule_manager +from src.plugin_system import tool_api +from src.plugin_system.base.component_types import ComponentType +from src.config.config import global_config +from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat, build_readable_messages_with_id +from src.mood.mood_manager import mood_manager if TYPE_CHECKING: from ..cycle_processor import CycleProcessor @@ -20,6 +27,7 @@ class ProactiveThinker: 当接收到 ProactiveTriggerEvent 时,它会根据事件内容进行一系列决策和操作, 例如调整情绪、调用规划器生成行动,并最终可能产生一个主动的回复。 """ + def __init__(self, context: HfcContext, cycle_processor: "CycleProcessor"): """ 初始化主动思考器。 @@ -75,9 +83,6 @@ class ProactiveThinker: return try: - # 动态导入情绪管理器,避免循环依赖 - from src.mood.mood_manager import mood_manager - # 获取当前聊天的情绪对象 mood_obj = mood_manager.get_mood_by_chat_id(self.context.stream_id) new_mood = None @@ -112,29 +117,17 @@ class ProactiveThinker: """ try: # 调用规划器的 PROACTIVE 模式,让其决定下一步的行动 - actions, target_message = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE) + actions, _ = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE) # 通常只关心规划出的第一个动作 action_result = actions[0] if actions else {} - # 检查规划出的动作是否是“什么都不做” - if action_result and action_result.get("action_type") != "do_nothing": - # 如果动作是“回复” - if action_result.get("action_type") == "reply": - # 调用生成器API来创建回复内容 - success, response_set, _ = await generator_api.generate_reply( - chat_stream=self.context.chat_stream, - reply_message=action_result["action_message"], - available_actions={}, # 主动回复不考虑工具使用 - enable_tool=False, - request_type="chat.replyer.proactive", # 标记请求类型 - from_plugin=False, - ) - # 如果成功生成回复,则发送出去 - if success and response_set: - await self.cycle_processor.response_handler.send_response( - response_set, time.time(), action_result["action_message"] - ) + action_type = action_result.get("action_type") + + if action_type == "proactive_reply": + await self._generate_proactive_content_and_send(action_result) + elif action_type != "do_nothing": + logger.warning(f"{self.context.log_prefix} 主动思考返回了未知的动作类型: {action_type}") else: # 如果规划结果是“什么都不做”,则记录日志 logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默") @@ -142,3 +135,98 @@ class ProactiveThinker: except Exception as e: logger.error(f"{self.context.log_prefix} 主动思考执行异常: {e}") logger.error(traceback.format_exc()) + + async def _generate_proactive_content_and_send(self, action_result: Dict[str, Any]): + """ + 获取实时信息,构建最终的生成提示词,并生成和发送主动回复。 + + Args: + action_result (Dict[str, Any]): 规划器返回的动作结果。 + """ + try: + topic = action_result.get("action_data", {}).get("topic", "随便聊聊") + logger.info(f"{self.context.log_prefix} 主动思考确定主题: '{topic}'") + + # 1. 获取日程信息 + schedule_block = "你今天没有日程安排。" + if global_config.planning_system.schedule_enable: + if current_activity := schedule_manager.get_current_activity(): + schedule_block = f"你当前正在:{current_activity}。" + + # 2. 网络搜索 + news_block = "暂时没有获取到最新资讯。" + try: + web_search_tool = tool_api.get_tool_instance("web_search") + if web_search_tool: + tool_args = {"query": topic, "max_results": 10} + # 调用工具,并传递参数 + search_result_dict = await web_search_tool.execute(**tool_args) + if search_result_dict and not search_result_dict.get("error"): + news_block = search_result_dict.get("content", "未能提取有效资讯。") + else: + logger.warning(f"{self.context.log_prefix} 网络搜索返回错误: {search_result_dict.get('error')}") + else: + logger.warning(f"{self.context.log_prefix} 未找到 web_search 工具实例。") + except Exception as e: + logger.error(f"{self.context.log_prefix} 主动思考时网络搜索失败: {e}") + + # 3. 获取最新的聊天上下文 + message_list = get_raw_msg_before_timestamp_with_chat( + chat_id=self.context.stream_id, + timestamp=time.time(), + limit=int(global_config.chat.max_context_size * 0.3), + ) + chat_context_block, _ = build_readable_messages_with_id(messages=message_list) + + # 4. 构建最终的生成提示词 + bot_name = global_config.bot.nickname + identity_block = f"你的名字是{bot_name},你{global_config.personality.personality_core}:" + mood_block = f"你现在的心情是:{mood_manager.get_mood_by_chat_id(self.context.stream_id).mood_state}" + + final_prompt = f""" +# 主动对话生成 + +## 你的角色 +{identity_block} + +## 你的心情 +{mood_block} + +## 你今天的日程安排 +{schedule_block} + +## 关于你准备讨论的话题“{topic}”的最新信息 +{news_block} + +## 最近的聊天内容 +{chat_context_block} + +## 任务 +你之前决定要发起一个关于“{topic}”的对话。现在,请结合以上所有信息,自然地开启这个话题。 + +## 要求 +- 你的发言要听起来像是自发的,而不是在念报告。 +- 巧妙地将日程安排或最新信息融入到你的开场白中。 +- 风格要符合你的角色设定。 +- 直接输出你想要说的内容,不要包含其他额外信息。 +""" + + # 5. 调用生成器API并发送 + response_text = await generator_api.generate_response_custom( + chat_stream=self.context.chat_stream, + prompt=final_prompt, + request_type="chat.replyer.proactive", + ) + + if response_text: + # 将纯文本包装成 ResponseSet 格式 + response_set = [{"type": "text", "data": {"text": response_text}}] + await self.cycle_processor.response_handler.send_response( + response_set, time.time(), action_result.get("action_message") + ) + else: + logger.error(f"{self.context.log_prefix} 主动思考生成回复失败。") + + except Exception as e: + logger.error(f"{self.context.log_prefix} 生成主动回复内容时异常: {e}") + logger.error(traceback.format_exc()) diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index e2efa11aa..c353df3ef 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -90,12 +90,31 @@ def init_prompt(): ## 长期记忆摘要 {long_term_memory_block} +## 最近的聊天内容 +{chat_content_block} + ## 任务 -基于以上所有信息,分析当前情况,决定是否需要主动做些什么。 -如果你认为不需要,就选择 'do_nothing'。 +基于以上所有信息(特别是最近的聊天内容),分析当前情况,决定是否适合主动开启一个**新的、但又与当前氛围相关**的话题。 ## 可用动作 -{action_options_text} +动作:proactive_reply +动作描述:在当前对话的基础上,主动发起一个新的对话,分享一个有趣的想法、见闻或者对未来的计划。 +- 当你觉得可以说些什么来活跃气氛,并且内容与当前聊天氛围不冲突时 +- 当你有一些新的想法或计划想要分享,并且可以自然地衔接当前话题时 +{{ + "action": "proactive_reply", + "reason": "决定主动发起对话的具体原因", + "topic": "你想要发起对话的主题或内容(需要简洁)" +}} + +动作:do_nothing +动作描述:保持沉默,不主动发起任何动作或对话。 +- 当你分析了所有信息后,觉得当前不是一个发起互动的好时机时 +- 当最近的聊天内容很连贯,你的插入会打断别人时 +{{ + "action": "do_nothing", + "reason":"决定保持沉默的具体原因" +}} 你必须从上面列出的可用action中选择一个。 请以严格的 JSON 格式输出,且仅包含 JSON 内容: @@ -643,7 +662,19 @@ class ActionPlanner: # --- 根据模式构建不同的Prompt --- if mode == ChatMode.PROACTIVE: long_term_memory_block = await self._get_long_term_memory_context() - action_options_text = await self._build_action_options(current_available_actions, mode) + + # 获取最近的聊天记录用于主动思考决策 + message_list_short = get_raw_msg_before_timestamp_with_chat( + chat_id=self.chat_id, + timestamp=time.time(), + limit=int(global_config.chat.max_context_size * 0.2), # 主动思考时只看少量最近消息 + ) + chat_content_block, _ = build_readable_messages_with_id( + messages=message_list_short, + timestamp_mode="normal", + truncate=False, + show_actions=False, + ) prompt_template = await global_prompt_manager.get_prompt_async("proactive_planner_prompt") prompt = prompt_template.format( @@ -652,7 +683,7 @@ class ActionPlanner: schedule_block=schedule_block, mood_block=mood_block, long_term_memory_block=long_term_memory_block, - action_options_text=action_options_text, + chat_content_block=chat_content_block or "最近没有聊天内容。", ) return prompt, [] diff --git a/src/plugins/built_in/web_search_tool/_manifest.json b/src/plugins/built_in/web_search_tool/_manifest.json new file mode 100644 index 000000000..549781c2a --- /dev/null +++ b/src/plugins/built_in/web_search_tool/_manifest.json @@ -0,0 +1,25 @@ +{ + "manifest_version": 1, + "name": "web_search_tool", + "version": "1.0.0", + "description": "一个用于在互联网上搜索信息的工具", + "author": { + "name": "MoFox-Studio", + "url": "https://github.com/MoFox-Studio" + }, + "license": "GPL-v3.0-or-later", + + "host_application": { + "min_version": "0.10.0" + }, + "keywords": ["web_search", "url_parser"], + "categories": ["web_search", "url_parser"], + + "default_locale": "zh-CN", + "locales_path": "_locales", + + "plugin_info": { + "is_built_in": false, + "plugin_type": "web_search" + } +} \ No newline at end of file diff --git a/src/plugins/built_in/web_search_tool/engines/__init__.py b/src/plugins/built_in/web_search_tool/engines/__init__.py new file mode 100644 index 000000000..2f1c3492c --- /dev/null +++ b/src/plugins/built_in/web_search_tool/engines/__init__.py @@ -0,0 +1,3 @@ +""" +Search engines package +""" diff --git a/src/plugins/built_in/web_search_tool/engines/base.py b/src/plugins/built_in/web_search_tool/engines/base.py new file mode 100644 index 000000000..f7641aa2f --- /dev/null +++ b/src/plugins/built_in/web_search_tool/engines/base.py @@ -0,0 +1,31 @@ +""" +Base search engine interface +""" +from abc import ABC, abstractmethod +from typing import Dict, List, Any + + +class BaseSearchEngine(ABC): + """ + 搜索引擎基类 + """ + + @abstractmethod + async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + 执行搜索 + + Args: + args: 搜索参数,包含 query、num_results、time_range 等 + + Returns: + 搜索结果列表,每个结果包含 title、url、snippet、provider 字段 + """ + pass + + @abstractmethod + def is_available(self) -> bool: + """ + 检查搜索引擎是否可用 + """ + pass diff --git a/src/plugins/built_in/web_search_tool/engines/bing_engine.py b/src/plugins/built_in/web_search_tool/engines/bing_engine.py new file mode 100644 index 000000000..ac90956e0 --- /dev/null +++ b/src/plugins/built_in/web_search_tool/engines/bing_engine.py @@ -0,0 +1,263 @@ +""" +Bing search engine implementation +""" +import asyncio +import functools +import random +import traceback +from typing import Dict, List, Any +import requests +from bs4 import BeautifulSoup + +from src.common.logger import get_logger +from .base import BaseSearchEngine + +logger = get_logger("bing_engine") + +ABSTRACT_MAX_LENGTH = 300 # abstract max length + +user_agents = [ + # Edge浏览器 + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", + # Chrome浏览器 + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + # Firefox浏览器 + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:123.0) Gecko/20100101 Firefox/123.0", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0", +] + +# 请求头信息 +HEADERS = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "Accept-Encoding": "gzip, deflate, br", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", + "Cache-Control": "max-age=0", + "Connection": "keep-alive", + "Host": "www.bing.com", + "Referer": "https://www.bing.com/", + "Sec-Ch-Ua": '"Chromium";v="122", "Microsoft Edge";v="122", "Not-A.Brand";v="99"', + "Sec-Ch-Ua-Mobile": "?0", + "Sec-Ch-Ua-Platform": '"Windows"', + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "same-origin", + "Sec-Fetch-User": "?1", + "Upgrade-Insecure-Requests": "1", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", +} + +bing_search_url = "https://www.bing.com/search?q=" + + +class BingSearchEngine(BaseSearchEngine): + """ + Bing搜索引擎实现 + """ + + def __init__(self): + self.session = requests.Session() + self.session.headers = HEADERS + + def is_available(self) -> bool: + """检查Bing搜索引擎是否可用""" + return True # Bing是免费搜索引擎,总是可用 + + async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: + """执行Bing搜索""" + query = args["query"] + num_results = args.get("num_results", 3) + time_range = args.get("time_range", "any") + + try: + loop = asyncio.get_running_loop() + func = functools.partial(self._search_sync, query, num_results, time_range) + search_response = await loop.run_in_executor(None, func) + return search_response + except Exception as e: + logger.error(f"Bing 搜索失败: {e}") + return [] + + def _search_sync(self, keyword: str, num_results: int, time_range: str) -> List[Dict[str, Any]]: + """同步执行Bing搜索""" + if not keyword: + return [] + + list_result = [] + + # 构建搜索URL + search_url = bing_search_url + keyword + + # 如果指定了时间范围,添加时间过滤参数 + if time_range == "week": + search_url += "&qft=+filterui:date-range-7" + elif time_range == "month": + search_url += "&qft=+filterui:date-range-30" + + try: + data = self._parse_html(search_url) + if data: + list_result.extend(data) + logger.debug(f"Bing搜索 [{keyword}] 找到 {len(data)} 个结果") + + except Exception as e: + logger.error(f"Bing搜索解析失败: {e}") + return [] + + logger.debug(f"Bing搜索 [{keyword}] 完成,总共 {len(list_result)} 个结果") + return list_result[:num_results] if len(list_result) > num_results else list_result + + def _parse_html(self, url: str) -> List[Dict[str, Any]]: + """解析处理结果""" + try: + logger.debug(f"访问Bing搜索URL: {url}") + + # 设置必要的Cookie + cookies = { + "SRCHHPGUSR": "SRCHLANG=zh-Hans", # 设置默认搜索语言为中文 + "SRCHD": "AF=NOFORM", + "SRCHUID": "V=2&GUID=1A4D4F1C8844493F9A2E3DB0D1BC806C", + "_SS": "SID=0D89D9A3C95C60B62E7AC80CC85461B3", + "_EDGE_S": "ui=zh-cn", # 设置界面语言为中文 + "_EDGE_V": "1", + } + + # 为每次请求随机选择不同的用户代理,降低被屏蔽风险 + headers = HEADERS.copy() + headers["User-Agent"] = random.choice(user_agents) + + # 创建新的session + session = requests.Session() + session.headers.update(headers) + session.cookies.update(cookies) + + # 发送请求 + try: + res = session.get(url=url, timeout=(3.05, 6), verify=True, allow_redirects=True) + except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: + logger.warning(f"第一次请求超时,正在重试: {str(e)}") + try: + res = session.get(url=url, timeout=(5, 10), verify=False) + except Exception as e2: + logger.error(f"第二次请求也失败: {str(e2)}") + return [] + + res.encoding = "utf-8" + + # 检查响应状态 + if res.status_code == 403: + logger.error("被禁止访问 (403 Forbidden),可能是IP被限制") + return [] + + if res.status_code != 200: + logger.error(f"必应搜索请求失败,状态码: {res.status_code}") + return [] + + # 检查是否被重定向到登录页面或验证页面 + if "login.live.com" in res.url or "login.microsoftonline.com" in res.url: + logger.error("被重定向到登录页面,可能需要登录") + return [] + + if "https://www.bing.com/ck/a" in res.url: + logger.error("被重定向到验证页面,可能被识别为机器人") + return [] + + # 解析HTML + try: + root = BeautifulSoup(res.text, "lxml") + except Exception: + try: + root = BeautifulSoup(res.text, "html.parser") + except Exception as e: + logger.error(f"HTML解析失败: {str(e)}") + return [] + + list_data = [] + + # 尝试提取搜索结果 + # 方法1: 查找标准的搜索结果容器 + results = root.select("ol#b_results li.b_algo") + + if results: + for _rank, result in enumerate(results, 1): + # 提取标题和链接 + title_link = result.select_one("h2 a") + if not title_link: + continue + + title = title_link.get_text().strip() + url = title_link.get("href", "") + + # 提取摘要 + abstract = "" + abstract_elem = result.select_one("div.b_caption p") + if abstract_elem: + abstract = abstract_elem.get_text().strip() + + # 限制摘要长度 + if ABSTRACT_MAX_LENGTH and len(abstract) > ABSTRACT_MAX_LENGTH: + abstract = abstract[:ABSTRACT_MAX_LENGTH] + "..." + + list_data.append({ + "title": title, + "url": url, + "snippet": abstract, + "provider": "Bing" + }) + + if len(list_data) >= 10: # 限制结果数量 + break + + # 方法2: 如果标准方法没找到结果,使用备用方法 + if not list_data: + # 查找所有可能的搜索结果链接 + all_links = root.find_all("a") + + for link in all_links: + href = link.get("href", "") + text = link.get_text().strip() + + # 过滤有效的搜索结果链接 + if (href and text and len(text) > 10 + and not href.startswith("javascript:") + and not href.startswith("#") + and "http" in href + and not any(x in href for x in [ + "bing.com/search", "bing.com/images", "bing.com/videos", + "bing.com/maps", "bing.com/news", "login", "account", + "microsoft", "javascript" + ])): + + # 尝试获取摘要 + abstract = "" + parent = link.parent + if parent and parent.get_text(): + full_text = parent.get_text().strip() + if len(full_text) > len(text): + abstract = full_text.replace(text, "", 1).strip() + + # 限制摘要长度 + if ABSTRACT_MAX_LENGTH and len(abstract) > ABSTRACT_MAX_LENGTH: + abstract = abstract[:ABSTRACT_MAX_LENGTH] + "..." + + list_data.append({ + "title": text, + "url": href, + "snippet": abstract, + "provider": "Bing" + }) + + if len(list_data) >= 10: + break + + logger.debug(f"从Bing解析到 {len(list_data)} 个搜索结果") + return list_data + + except Exception as e: + logger.error(f"解析Bing页面时出错: {str(e)}") + logger.debug(traceback.format_exc()) + return [] diff --git a/src/plugins/built_in/web_search_tool/engines/ddg_engine.py b/src/plugins/built_in/web_search_tool/engines/ddg_engine.py new file mode 100644 index 000000000..011935e27 --- /dev/null +++ b/src/plugins/built_in/web_search_tool/engines/ddg_engine.py @@ -0,0 +1,42 @@ +""" +DuckDuckGo search engine implementation +""" +from typing import Dict, List, Any +from asyncddgs import aDDGS + +from src.common.logger import get_logger +from .base import BaseSearchEngine + +logger = get_logger("ddg_engine") + + +class DDGSearchEngine(BaseSearchEngine): + """ + DuckDuckGo搜索引擎实现 + """ + + def is_available(self) -> bool: + """检查DuckDuckGo搜索引擎是否可用""" + return True # DuckDuckGo不需要API密钥,总是可用 + + async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: + """执行DuckDuckGo搜索""" + query = args["query"] + num_results = args.get("num_results", 3) + + try: + async with aDDGS() as ddgs: + search_response = await ddgs.text(query, max_results=num_results) + + return [ + { + "title": r.get("title"), + "url": r.get("href"), + "snippet": r.get("body"), + "provider": "DuckDuckGo" + } + for r in search_response + ] + except Exception as e: + logger.error(f"DuckDuckGo 搜索失败: {e}") + return [] diff --git a/src/plugins/built_in/web_search_tool/engines/exa_engine.py b/src/plugins/built_in/web_search_tool/engines/exa_engine.py new file mode 100644 index 000000000..2bb515e8e --- /dev/null +++ b/src/plugins/built_in/web_search_tool/engines/exa_engine.py @@ -0,0 +1,79 @@ +""" +Exa search engine implementation +""" +import asyncio +import functools +from datetime import datetime, timedelta +from typing import Dict, List, Any +from exa_py import Exa + +from src.common.logger import get_logger +from src.plugin_system.apis import config_api +from .base import BaseSearchEngine +from ..utils.api_key_manager import create_api_key_manager_from_config + +logger = get_logger("exa_engine") + + +class ExaSearchEngine(BaseSearchEngine): + """ + Exa搜索引擎实现 + """ + + def __init__(self): + self._initialize_clients() + + def _initialize_clients(self): + """初始化Exa客户端""" + # 从主配置文件读取API密钥 + exa_api_keys = config_api.get_global_config("exa.api_keys", None) + + # 创建API密钥管理器 + self.api_manager = create_api_key_manager_from_config( + exa_api_keys, + lambda key: Exa(api_key=key), + "Exa" + ) + + def is_available(self) -> bool: + """检查Exa搜索引擎是否可用""" + return self.api_manager.is_available() + + async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: + """执行Exa搜索""" + if not self.is_available(): + return [] + + query = args["query"] + num_results = args.get("num_results", 3) + time_range = args.get("time_range", "any") + + exa_args = {"num_results": num_results, "text": True, "highlights": True} + if time_range != "any": + today = datetime.now() + start_date = today - timedelta(days=7 if time_range == "week" else 30) + exa_args["start_published_date"] = start_date.strftime('%Y-%m-%d') + + try: + # 使用API密钥管理器获取下一个客户端 + exa_client = self.api_manager.get_next_client() + if not exa_client: + logger.error("无法获取Exa客户端") + return [] + + loop = asyncio.get_running_loop() + func = functools.partial(exa_client.search_and_contents, query, **exa_args) + search_response = await loop.run_in_executor(None, func) + + return [ + { + "title": res.title, + "url": res.url, + "snippet": " ".join(getattr(res, 'highlights', [])) or (getattr(res, 'text', '')[:250] + '...'), + "provider": "Exa" + } + for res in search_response.results + ] + except Exception as e: + logger.error(f"Exa 搜索失败: {e}") + return [] diff --git a/src/plugins/built_in/web_search_tool/engines/tavily_engine.py b/src/plugins/built_in/web_search_tool/engines/tavily_engine.py new file mode 100644 index 000000000..affb303fc --- /dev/null +++ b/src/plugins/built_in/web_search_tool/engines/tavily_engine.py @@ -0,0 +1,90 @@ +""" +Tavily search engine implementation +""" +import asyncio +import functools +from typing import Dict, List, Any +from tavily import TavilyClient + +from src.common.logger import get_logger +from src.plugin_system.apis import config_api +from .base import BaseSearchEngine +from ..utils.api_key_manager import create_api_key_manager_from_config + +logger = get_logger("tavily_engine") + + +class TavilySearchEngine(BaseSearchEngine): + """ + Tavily搜索引擎实现 + """ + + def __init__(self): + self._initialize_clients() + + def _initialize_clients(self): + """初始化Tavily客户端""" + # 从主配置文件读取API密钥 + tavily_api_keys = config_api.get_global_config("tavily.api_keys", None) + + # 创建API密钥管理器 + self.api_manager = create_api_key_manager_from_config( + tavily_api_keys, + lambda key: TavilyClient(api_key=key), + "Tavily" + ) + + def is_available(self) -> bool: + """检查Tavily搜索引擎是否可用""" + return self.api_manager.is_available() + + async def search(self, args: Dict[str, Any]) -> List[Dict[str, Any]]: + """执行Tavily搜索""" + if not self.is_available(): + return [] + + query = args["query"] + num_results = args.get("num_results", 3) + time_range = args.get("time_range", "any") + + try: + # 使用API密钥管理器获取下一个客户端 + tavily_client = self.api_manager.get_next_client() + if not tavily_client: + logger.error("无法获取Tavily客户端") + return [] + + # 构建Tavily搜索参数 + search_params = { + "query": query, + "max_results": num_results, + "search_depth": "basic", + "include_answer": False, + "include_raw_content": False + } + + # 根据时间范围调整搜索参数 + if time_range == "week": + search_params["days"] = 7 + elif time_range == "month": + search_params["days"] = 30 + + loop = asyncio.get_running_loop() + func = functools.partial(tavily_client.search, **search_params) + search_response = await loop.run_in_executor(None, func) + + results = [] + if search_response and "results" in search_response: + for res in search_response["results"]: + results.append({ + "title": res.get("title", "无标题"), + "url": res.get("url", ""), + "snippet": res.get("content", "")[:300] + "..." if res.get("content") else "无摘要", + "provider": "Tavily" + }) + + return results + + except Exception as e: + logger.error(f"Tavily 搜索失败: {e}") + return [] diff --git a/src/plugins/built_in/web_search_tool/plugin.py b/src/plugins/built_in/web_search_tool/plugin.py new file mode 100644 index 000000000..1789062ae --- /dev/null +++ b/src/plugins/built_in/web_search_tool/plugin.py @@ -0,0 +1,160 @@ +""" +Web Search Tool Plugin + +一个功能强大的网络搜索和URL解析插件,支持多种搜索引擎和解析策略。 +""" +from typing import List, Tuple, Type + +from src.plugin_system import ( + BasePlugin, + register_plugin, + ComponentInfo, + ConfigField, + PythonDependency +) +from src.plugin_system.apis import config_api +from src.common.logger import get_logger + +from .tools.web_search import WebSurfingTool +from .tools.url_parser import URLParserTool + +logger = get_logger("web_search_plugin") + + +@register_plugin +class WEBSEARCHPLUGIN(BasePlugin): + """ + 网络搜索工具插件 + + 提供网络搜索和URL解析功能,支持多种搜索引擎: + - Exa (需要API密钥) + - Tavily (需要API密钥) + - DuckDuckGo (免费) + - Bing (免费) + """ + + # 插件基本信息 + plugin_name: str = "web_search_tool" # 内部标识符 + enable_plugin: bool = True + dependencies: List[str] = [] # 插件依赖列表 + + def __init__(self, *args, **kwargs): + """初始化插件,立即加载所有搜索引擎""" + super().__init__(*args, **kwargs) + + # 立即初始化所有搜索引擎,触发API密钥管理器的日志输出 + logger.info("🚀 正在初始化所有搜索引擎...") + try: + from .engines.exa_engine import ExaSearchEngine + from .engines.tavily_engine import TavilySearchEngine + from .engines.ddg_engine import DDGSearchEngine + from .engines.bing_engine import BingSearchEngine + + # 实例化所有搜索引擎,这会触发API密钥管理器的初始化 + exa_engine = ExaSearchEngine() + tavily_engine = TavilySearchEngine() + ddg_engine = DDGSearchEngine() + bing_engine = BingSearchEngine() + + # 报告每个引擎的状态 + engines_status = { + "Exa": exa_engine.is_available(), + "Tavily": tavily_engine.is_available(), + "DuckDuckGo": ddg_engine.is_available(), + "Bing": bing_engine.is_available() + } + + available_engines = [name for name, available in engines_status.items() if available] + unavailable_engines = [name for name, available in engines_status.items() if not available] + + if available_engines: + logger.info(f"✅ 可用搜索引擎: {', '.join(available_engines)}") + if unavailable_engines: + logger.info(f"❌ 不可用搜索引擎: {', '.join(unavailable_engines)}") + + except Exception as e: + logger.error(f"❌ 搜索引擎初始化失败: {e}", exc_info=True) + + # Python包依赖列表 + python_dependencies: List[PythonDependency] = [ + PythonDependency( + package_name="asyncddgs", + description="异步DuckDuckGo搜索库", + optional=False + ), + PythonDependency( + package_name="exa_py", + description="Exa搜索API客户端库", + optional=True # 如果没有API密钥,这个是可选的 + ), + PythonDependency( + package_name="tavily", + install_name="tavily-python", # 安装时使用这个名称 + description="Tavily搜索API客户端库", + optional=True # 如果没有API密钥,这个是可选的 + ), + PythonDependency( + package_name="httpx", + version=">=0.20.0", + install_name="httpx[socks]", # 安装时使用这个名称(包含可选依赖) + description="支持SOCKS代理的HTTP客户端库", + optional=False + ) + ] + config_file_name: str = "config.toml" # 配置文件名 + + # 配置节描述 + config_section_descriptions = { + "plugin": "插件基本信息", + "proxy": "链接本地解析代理配置" + } + + # 配置Schema定义 + # 注意:EXA配置和组件设置已迁移到主配置文件(bot_config.toml)的[exa]和[web_search]部分 + config_schema: dict = { + "plugin": { + "name": ConfigField(type=str, default="WEB_SEARCH_PLUGIN", description="插件名称"), + "version": ConfigField(type=str, default="1.0.0", description="插件版本"), + "enabled": ConfigField(type=bool, default=False, description="是否启用插件"), + }, + "proxy": { + "http_proxy": ConfigField( + type=str, + default=None, + description="HTTP代理地址,格式如: http://proxy.example.com:8080" + ), + "https_proxy": ConfigField( + type=str, + default=None, + description="HTTPS代理地址,格式如: http://proxy.example.com:8080" + ), + "socks5_proxy": ConfigField( + type=str, + default=None, + description="SOCKS5代理地址,格式如: socks5://proxy.example.com:1080" + ), + "enable_proxy": ConfigField( + type=bool, + default=False, + description="是否启用代理" + ) + }, + } + + def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: + """ + 获取插件组件列表 + + Returns: + 组件信息和类型的元组列表 + """ + enable_tool = [] + + # 从主配置文件读取组件启用配置 + if config_api.get_global_config("web_search.enable_web_search_tool", True): + enable_tool.append((WebSurfingTool.get_tool_info(), WebSurfingTool)) + + if config_api.get_global_config("web_search.enable_url_tool", True): + enable_tool.append((URLParserTool.get_tool_info(), URLParserTool)) + + return enable_tool diff --git a/src/plugins/built_in/web_search_tool/tools/__init__.py b/src/plugins/built_in/web_search_tool/tools/__init__.py new file mode 100644 index 000000000..480099acd --- /dev/null +++ b/src/plugins/built_in/web_search_tool/tools/__init__.py @@ -0,0 +1,3 @@ +""" +Tools package +""" diff --git a/src/plugins/built_in/web_search_tool/tools/url_parser.py b/src/plugins/built_in/web_search_tool/tools/url_parser.py new file mode 100644 index 000000000..315e06271 --- /dev/null +++ b/src/plugins/built_in/web_search_tool/tools/url_parser.py @@ -0,0 +1,242 @@ +""" +URL parser tool implementation +""" +import asyncio +import functools +from typing import Any, Dict +from exa_py import Exa +import httpx +from bs4 import BeautifulSoup + +from src.common.logger import get_logger +from src.plugin_system import BaseTool, ToolParamType, llm_api +from src.plugin_system.apis import config_api +from src.common.cache_manager import tool_cache + +from ..utils.formatters import format_url_parse_results +from ..utils.url_utils import parse_urls_from_input, validate_urls +from ..utils.api_key_manager import create_api_key_manager_from_config + +logger = get_logger("url_parser_tool") + + +class URLParserTool(BaseTool): + """ + 一个用于解析和总结一个或多个网页URL内容的工具。 + """ + name: str = "parse_url" + description: str = "当需要理解一个或多个特定网页链接的内容时,使用此工具。例如:'这些网页讲了什么?[https://example.com, https://example2.com]' 或 '帮我总结一下这些文章'" + available_for_llm: bool = True + parameters = [ + ("urls", ToolParamType.STRING, "要理解的网站", True, None), + ] + + def __init__(self, plugin_config=None): + super().__init__(plugin_config) + self._initialize_exa_clients() + + def _initialize_exa_clients(self): + """初始化Exa客户端""" + # 优先从主配置文件读取,如果没有则从插件配置文件读取 + exa_api_keys = config_api.get_global_config("exa.api_keys", None) + if exa_api_keys is None: + # 从插件配置文件读取 + exa_api_keys = self.get_config("exa.api_keys", []) + + # 创建API密钥管理器 + self.api_manager = create_api_key_manager_from_config( + exa_api_keys, + lambda key: Exa(api_key=key), + "Exa URL Parser" + ) + + async def _local_parse_and_summarize(self, url: str) -> Dict[str, Any]: + """ + 使用本地库(httpx, BeautifulSoup)解析URL,并调用LLM进行总结。 + """ + try: + # 读取代理配置 + enable_proxy = self.get_config("proxy.enable_proxy", False) + proxies = None + + if enable_proxy: + socks5_proxy = self.get_config("proxy.socks5_proxy", None) + http_proxy = self.get_config("proxy.http_proxy", None) + https_proxy = self.get_config("proxy.https_proxy", None) + + # 优先使用SOCKS5代理(全协议代理) + if socks5_proxy: + proxies = socks5_proxy + logger.info(f"使用SOCKS5代理: {socks5_proxy}") + elif http_proxy or https_proxy: + proxies = {} + if http_proxy: + proxies["http://"] = http_proxy + if https_proxy: + proxies["https://"] = https_proxy + logger.info(f"使用HTTP/HTTPS代理配置: {proxies}") + + client_kwargs = {"timeout": 15.0, "follow_redirects": True} + if proxies: + client_kwargs["proxies"] = proxies + + async with httpx.AsyncClient(**client_kwargs) as client: + response = await client.get(url) + response.raise_for_status() + + soup = BeautifulSoup(response.text, "html.parser") + + title = soup.title.string if soup.title else "无标题" + for script in soup(["script", "style"]): + script.extract() + text = soup.get_text(separator="\n", strip=True) + + if not text: + return {"error": "无法从页面提取有效文本内容。"} + + summary_prompt = f"请根据以下网页内容,生成一段不超过300字的中文摘要,保留核心信息和关键点:\n\n---\n\n标题: {title}\n\n内容:\n{text[:4000]}\n\n---\n\n摘要:" + + text_model = str(self.get_config("models.text_model", "replyer_1")) + models = llm_api.get_available_models() + model_config = models.get(text_model) + if not model_config: + logger.error("未配置LLM模型") + return {"error": "未配置LLM模型"} + + success, summary, reasoning, model_name = await llm_api.generate_with_model( + prompt=summary_prompt, + model_config=model_config, + request_type="story.generate", + temperature=0.3, + max_tokens=1000 + ) + + if not success: + logger.info(f"生成摘要失败: {summary}") + return {"error": "发生ai错误"} + + logger.info(f"成功生成摘要内容:'{summary}'") + + return { + "title": title, + "url": url, + "snippet": summary, + "source": "local" + } + + except httpx.HTTPStatusError as e: + logger.warning(f"本地解析URL '{url}' 失败 (HTTP {e.response.status_code})") + return {"error": f"请求失败,状态码: {e.response.status_code}"} + except Exception as e: + logger.error(f"本地解析或总结URL '{url}' 时发生未知异常: {e}", exc_info=True) + return {"error": f"发生未知错误: {str(e)}"} + + async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]: + """ + 执行URL内容提取和总结。优先使用Exa,失败后尝试本地解析。 + """ + # 获取当前文件路径用于缓存键 + import os + current_file_path = os.path.abspath(__file__) + + # 检查缓存 + cached_result = await tool_cache.get(self.name, function_args, current_file_path) + if cached_result: + logger.info(f"缓存命中: {self.name} -> {function_args}") + return cached_result + + urls_input = function_args.get("urls") + if not urls_input: + return {"error": "URL列表不能为空。"} + + # 处理URL输入,确保是列表格式 + urls = parse_urls_from_input(urls_input) + if not urls: + return {"error": "提供的字符串中未找到有效的URL。"} + + # 验证URL格式 + valid_urls = validate_urls(urls) + if not valid_urls: + return {"error": "未找到有效的URL。"} + + urls = valid_urls + logger.info(f"准备解析 {len(urls)} 个URL: {urls}") + + successful_results = [] + error_messages = [] + urls_to_retry_locally = [] + + # 步骤 1: 尝试使用 Exa API 进行解析 + contents_response = None + if self.api_manager.is_available(): + logger.info(f"开始使用 Exa API 解析URL: {urls}") + try: + # 使用API密钥管理器获取下一个客户端 + exa_client = self.api_manager.get_next_client() + if not exa_client: + logger.error("无法获取Exa客户端") + else: + loop = asyncio.get_running_loop() + exa_params = {"text": True, "summary": True, "highlights": True} + func = functools.partial(exa_client.get_contents, urls, **exa_params) + contents_response = await loop.run_in_executor(None, func) + except Exception as e: + logger.error(f"执行 Exa URL解析时发生严重异常: {e}", exc_info=True) + contents_response = None # 确保异常后为None + + # 步骤 2: 处理Exa的响应 + if contents_response and hasattr(contents_response, 'statuses'): + results_map = {res.url: res for res in contents_response.results} if hasattr(contents_response, 'results') else {} + if contents_response.statuses: + for status in contents_response.statuses: + if status.status == 'success': + res = results_map.get(status.id) + if res: + summary = getattr(res, 'summary', '') + highlights = " ".join(getattr(res, 'highlights', [])) + text_snippet = (getattr(res, 'text', '')[:300] + '...') if getattr(res, 'text', '') else '' + snippet = summary or highlights or text_snippet or '无摘要' + + successful_results.append({ + "title": getattr(res, 'title', '无标题'), + "url": getattr(res, 'url', status.id), + "snippet": snippet, + "source": "exa" + }) + else: + error_tag = getattr(status, 'error', '未知错误') + logger.warning(f"Exa解析URL '{status.id}' 失败: {error_tag}。准备本地重试。") + urls_to_retry_locally.append(status.id) + else: + # 如果Exa未配置、API调用失败或返回无效响应,则所有URL都进入本地重试 + urls_to_retry_locally.extend(url for url in urls if url not in [res['url'] for res in successful_results]) + + # 步骤 3: 对失败的URL进行本地解析 + if urls_to_retry_locally: + logger.info(f"开始本地解析以下URL: {urls_to_retry_locally}") + local_tasks = [self._local_parse_and_summarize(url) for url in urls_to_retry_locally] + local_results = await asyncio.gather(*local_tasks) + + for i, res in enumerate(local_results): + url = urls_to_retry_locally[i] + if "error" in res: + error_messages.append(f"URL: {url} - 解析失败: {res['error']}") + else: + successful_results.append(res) + + if not successful_results: + return {"error": "无法从所有给定的URL获取内容。", "details": error_messages} + + formatted_content = format_url_parse_results(successful_results) + + result = { + "type": "url_parse_result", + "content": formatted_content, + "errors": error_messages + } + + # 保存到缓存 + if "error" not in result: + await tool_cache.set(self.name, function_args, current_file_path, result) + + return result diff --git a/src/plugins/built_in/web_search_tool/tools/web_search.py b/src/plugins/built_in/web_search_tool/tools/web_search.py new file mode 100644 index 000000000..c09ad5e92 --- /dev/null +++ b/src/plugins/built_in/web_search_tool/tools/web_search.py @@ -0,0 +1,164 @@ +""" +Web search tool implementation +""" +import asyncio +from typing import Any, Dict, List + +from src.common.logger import get_logger +from src.plugin_system import BaseTool, ToolParamType +from src.plugin_system.apis import config_api +from src.common.cache_manager import tool_cache + +from ..engines.exa_engine import ExaSearchEngine +from ..engines.tavily_engine import TavilySearchEngine +from ..engines.ddg_engine import DDGSearchEngine +from ..engines.bing_engine import BingSearchEngine +from ..utils.formatters import format_search_results, deduplicate_results + +logger = get_logger("web_search_tool") + + +class WebSurfingTool(BaseTool): + """ + 网络搜索工具 + """ + name: str = "web_search" + description: str = "用于执行网络搜索。当用户明确要求搜索,或者需要获取关于公司、产品、事件的最新信息、新闻或动态时,必须使用此工具" + available_for_llm: bool = True + parameters = [ + ("query", ToolParamType.STRING, "要搜索的关键词或问题。", True, None), + ("num_results", ToolParamType.INTEGER, "期望每个搜索引擎返回的搜索结果数量,默认为5。", False, None), + ("time_range", ToolParamType.STRING, "指定搜索的时间范围,可以是 'any', 'week', 'month'。默认为 'any'。", False, ["any", "week", "month"]) + ] # type: ignore + + def __init__(self, plugin_config=None): + super().__init__(plugin_config) + # 初始化搜索引擎 + self.engines = { + "exa": ExaSearchEngine(), + "tavily": TavilySearchEngine(), + "ddg": DDGSearchEngine(), + "bing": BingSearchEngine() + } + + async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]: + query = function_args.get("query") + if not query: + return {"error": "搜索查询不能为空。"} + + # 获取当前文件路径用于缓存键 + import os + current_file_path = os.path.abspath(__file__) + + # 检查缓存 + cached_result = await tool_cache.get(self.name, function_args, current_file_path, semantic_query=query) + if cached_result: + logger.info(f"缓存命中: {self.name} -> {function_args}") + return cached_result + + # 读取搜索配置 + enabled_engines = config_api.get_global_config("web_search.enabled_engines", ["ddg"]) + search_strategy = config_api.get_global_config("web_search.search_strategy", "single") + + logger.info(f"开始搜索,策略: {search_strategy}, 启用引擎: {enabled_engines}, 参数: '{function_args}'") + + # 根据策略执行搜索 + if search_strategy == "parallel": + result = await self._execute_parallel_search(function_args, enabled_engines) + elif search_strategy == "fallback": + result = await self._execute_fallback_search(function_args, enabled_engines) + else: # single + result = await self._execute_single_search(function_args, enabled_engines) + + # 保存到缓存 + if "error" not in result: + await tool_cache.set(self.name, function_args, current_file_path, result, semantic_query=query) + + return result + + async def _execute_parallel_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]: + """并行搜索策略:同时使用所有启用的搜索引擎""" + search_tasks = [] + + for engine_name in enabled_engines: + engine = self.engines.get(engine_name) + if engine and engine.is_available(): + custom_args = function_args.copy() + custom_args["num_results"] = custom_args.get("num_results", 5) + search_tasks.append(engine.search(custom_args)) + + if not search_tasks: + return {"error": "没有可用的搜索引擎。"} + + try: + search_results_lists = await asyncio.gather(*search_tasks, return_exceptions=True) + + all_results = [] + for result in search_results_lists: + if isinstance(result, list): + all_results.extend(result) + elif isinstance(result, Exception): + logger.error(f"搜索时发生错误: {result}") + + # 去重并格式化 + unique_results = deduplicate_results(all_results) + formatted_content = format_search_results(unique_results) + + return { + "type": "web_search_result", + "content": formatted_content, + } + + except Exception as e: + logger.error(f"执行并行网络搜索时发生异常: {e}", exc_info=True) + return {"error": f"执行网络搜索时发生严重错误: {str(e)}"} + + async def _execute_fallback_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]: + """回退搜索策略:按顺序尝试搜索引擎,失败则尝试下一个""" + for engine_name in enabled_engines: + engine = self.engines.get(engine_name) + if not engine or not engine.is_available(): + continue + + try: + custom_args = function_args.copy() + custom_args["num_results"] = custom_args.get("num_results", 5) + + results = await engine.search(custom_args) + + if results: # 如果有结果,直接返回 + formatted_content = format_search_results(results) + return { + "type": "web_search_result", + "content": formatted_content, + } + + except Exception as e: + logger.warning(f"{engine_name} 搜索失败,尝试下一个引擎: {e}") + continue + + return {"error": "所有搜索引擎都失败了。"} + + async def _execute_single_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]: + """单一搜索策略:只使用第一个可用的搜索引擎""" + for engine_name in enabled_engines: + engine = self.engines.get(engine_name) + if not engine or not engine.is_available(): + continue + + try: + custom_args = function_args.copy() + custom_args["num_results"] = custom_args.get("num_results", 5) + + results = await engine.search(custom_args) + formatted_content = format_search_results(results) + return { + "type": "web_search_result", + "content": formatted_content, + } + + except Exception as e: + logger.error(f"{engine_name} 搜索失败: {e}") + return {"error": f"{engine_name} 搜索失败: {str(e)}"} + + return {"error": "没有可用的搜索引擎。"} diff --git a/src/plugins/built_in/web_search_tool/utils/__init__.py b/src/plugins/built_in/web_search_tool/utils/__init__.py new file mode 100644 index 000000000..8ebe2c35d --- /dev/null +++ b/src/plugins/built_in/web_search_tool/utils/__init__.py @@ -0,0 +1,3 @@ +""" +Web search tool utilities package +""" diff --git a/src/plugins/built_in/web_search_tool/utils/api_key_manager.py b/src/plugins/built_in/web_search_tool/utils/api_key_manager.py new file mode 100644 index 000000000..f8e0afa71 --- /dev/null +++ b/src/plugins/built_in/web_search_tool/utils/api_key_manager.py @@ -0,0 +1,84 @@ +""" +API密钥管理器,提供轮询机制 +""" +import itertools +from typing import List, Optional, TypeVar, Generic, Callable +from src.common.logger import get_logger + +logger = get_logger("api_key_manager") + +T = TypeVar('T') + + +class APIKeyManager(Generic[T]): + """ + API密钥管理器,支持轮询机制 + """ + + def __init__(self, api_keys: List[str], client_factory: Callable[[str], T], service_name: str = "Unknown"): + """ + 初始化API密钥管理器 + + Args: + api_keys: API密钥列表 + client_factory: 客户端工厂函数,接受API密钥参数并返回客户端实例 + service_name: 服务名称,用于日志记录 + """ + self.service_name = service_name + self.clients: List[T] = [] + self.client_cycle: Optional[itertools.cycle] = None + + if api_keys: + # 过滤有效的API密钥,排除None、空字符串、"None"字符串等 + valid_keys = [] + for key in api_keys: + if isinstance(key, str) and key.strip() and key.strip().lower() not in ("none", "null", ""): + valid_keys.append(key.strip()) + + if valid_keys: + try: + self.clients = [client_factory(key) for key in valid_keys] + self.client_cycle = itertools.cycle(self.clients) + logger.info(f"🔑 {service_name} 成功加载 {len(valid_keys)} 个 API 密钥") + except Exception as e: + logger.error(f"❌ 初始化 {service_name} 客户端失败: {e}") + self.clients = [] + self.client_cycle = None + else: + logger.warning(f"⚠️ {service_name} API Keys 配置无效(包含None或空值),{service_name} 功能将不可用") + else: + logger.warning(f"⚠️ {service_name} API Keys 未配置,{service_name} 功能将不可用") + + def is_available(self) -> bool: + """检查是否有可用的客户端""" + return bool(self.clients and self.client_cycle) + + def get_next_client(self) -> Optional[T]: + """获取下一个客户端(轮询)""" + if not self.is_available(): + return None + return next(self.client_cycle) + + def get_client_count(self) -> int: + """获取可用客户端数量""" + return len(self.clients) + + +def create_api_key_manager_from_config( + config_keys: Optional[List[str]], + client_factory: Callable[[str], T], + service_name: str +) -> APIKeyManager[T]: + """ + 从配置创建API密钥管理器的便捷函数 + + Args: + config_keys: 从配置读取的API密钥列表 + client_factory: 客户端工厂函数 + service_name: 服务名称 + + Returns: + API密钥管理器实例 + """ + api_keys = config_keys if isinstance(config_keys, list) else [] + return APIKeyManager(api_keys, client_factory, service_name) diff --git a/src/plugins/built_in/web_search_tool/utils/formatters.py b/src/plugins/built_in/web_search_tool/utils/formatters.py new file mode 100644 index 000000000..434f6f3c8 --- /dev/null +++ b/src/plugins/built_in/web_search_tool/utils/formatters.py @@ -0,0 +1,57 @@ +""" +Formatters for web search results +""" +from typing import List, Dict, Any + + +def format_search_results(results: List[Dict[str, Any]]) -> str: + """ + 格式化搜索结果为字符串 + """ + if not results: + return "没有找到相关的网络信息。" + + formatted_string = "根据网络搜索结果:\n\n" + for i, res in enumerate(results, 1): + title = res.get("title", '无标题') + url = res.get("url", '#') + snippet = res.get("snippet", '无摘要') + provider = res.get("provider", "未知来源") + + formatted_string += f"{i}. **{title}** (来自: {provider})\n" + formatted_string += f" - 摘要: {snippet}\n" + formatted_string += f" - 来源: {url}\n\n" + + return formatted_string + + +def format_url_parse_results(results: List[Dict[str, Any]]) -> str: + """ + 将成功解析的URL结果列表格式化为一段简洁的文本。 + """ + formatted_parts = [] + for res in results: + title = res.get('title', '无标题') + url = res.get('url', '#') + snippet = res.get('snippet', '无摘要') + source = res.get('source', '未知') + + formatted_string = f"**{title}**\n" + formatted_string += f"**内容摘要**:\n{snippet}\n" + formatted_string += f"**来源**: {url} (由 {source} 解析)\n" + formatted_parts.append(formatted_string) + + return "\n---\n".join(formatted_parts) + + +def deduplicate_results(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + 根据URL去重搜索结果 + """ + unique_urls = set() + unique_results = [] + for res in results: + if isinstance(res, dict) and res.get("url") and res["url"] not in unique_urls: + unique_urls.add(res["url"]) + unique_results.append(res) + return unique_results diff --git a/src/plugins/built_in/web_search_tool/utils/url_utils.py b/src/plugins/built_in/web_search_tool/utils/url_utils.py new file mode 100644 index 000000000..74afbc819 --- /dev/null +++ b/src/plugins/built_in/web_search_tool/utils/url_utils.py @@ -0,0 +1,39 @@ +""" +URL processing utilities +""" +import re +from typing import List + + +def parse_urls_from_input(urls_input) -> List[str]: + """ + 从输入中解析URL列表 + """ + if isinstance(urls_input, str): + # 如果是字符串,尝试解析为URL列表 + # 提取所有HTTP/HTTPS URL + url_pattern = r'https?://[^\s\],]+' + urls = re.findall(url_pattern, urls_input) + if not urls: + # 如果没有找到标准URL,将整个字符串作为单个URL + if urls_input.strip().startswith(('http://', 'https://')): + urls = [urls_input.strip()] + else: + return [] + elif isinstance(urls_input, list): + urls = [url.strip() for url in urls_input if isinstance(url, str) and url.strip()] + else: + return [] + + return urls + + +def validate_urls(urls: List[str]) -> List[str]: + """ + 验证URL格式,返回有效的URL列表 + """ + valid_urls = [] + for url in urls: + if url.startswith(('http://', 'https://')): + valid_urls.append(url) + return valid_urls From ceee3c1fbff075e093f1dd53c7b25df38dea0f78 Mon Sep 17 00:00:00 2001 From: minecraft1024a Date: Sat, 6 Sep 2025 20:09:17 +0800 Subject: [PATCH 2/4] =?UTF-8?q?refactor(chat):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E9=94=99=E5=88=AB=E5=AD=97=E7=94=9F=E6=88=90=E5=99=A8=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E4=B8=8E=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 对中文错别字生成器(`ChineseTypoGenerator`)进行了大规模重构和改进,以提升代码的可读性、可维护性和生成质量。 主要变更包括: - **逻辑拆分**: 将核心的单字替换逻辑从主函数 `create_typo_sentence` 中提取到新的私有方法 `_char_replace`,使主流程更清晰。 - **文档增强**: 全面重写和丰富了所有主要方法的文档字符串(docstrings),详细解释了每个参数的用途、函数的内部工作原理和设计决策,显著提高了代码的可理解性。 - **代码简化**: 优化了同音词的查找逻辑(`_get_word_homophones`),移除了复杂的评分和文件读取过程,直接利用 `jieba.dt.FREQ` 进行有效性验证,使代码更简洁高效。 - **健壮性提升**: 在拼音转换和处理逻辑中增加了更具体的异常捕获(`IndexError`, `TypeError`),提高了代码的稳定性。 - **修正建议格式**: 将修正建议的返回格式从单个字符串更改为 `(错字/词, 正确字/词)` 的元组,提供了更完整的上下文信息。 此外,在 `generator_api.py` 中移除了一段冗余的内容类型检查代码。 --- src/chat/utils/typo_generator.py | 221 +++++++++++++----------- src/plugin_system/apis/generator_api.py | 2 - 2 files changed, 121 insertions(+), 102 deletions(-) diff --git a/src/chat/utils/typo_generator.py b/src/chat/utils/typo_generator.py index c23c4c319..1f2f9c346 100644 --- a/src/chat/utils/typo_generator.py +++ b/src/chat/utils/typo_generator.py @@ -2,12 +2,14 @@ 错别字生成器 - 基于拼音和字频的中文错别字生成工具 """ -import orjson +import itertools import math import os import random import time + import jieba +import orjson from collections import defaultdict from pathlib import Path @@ -30,11 +32,16 @@ class ChineseTypoGenerator: 初始化错别字生成器。 Args: - error_rate (float): 单个汉字被替换为同音字的概率。 - min_freq (int): 候选替换字的最小词频阈值,低于此阈值的字将被忽略。 - tone_error_rate (float): 在选择同音字时,使用错误声调的概率。 - word_replace_rate (float): 整个词语被替换为同音词的概率。 - max_freq_diff (int): 允许的原始字与替换字之间的最大频率差异。 + error_rate (float): 控制单个汉字被替换为错别字的基础概率。 + 这个概率会根据词语长度进行调整,词语越长,单个字出错的概率越低。 + min_freq (int): 候选替换字的最小词频阈值。一个汉字的频率低于此值时, + 它不会被选为替换候选字,以避免生成过于生僻的错别字。 + tone_error_rate (float): 在寻找同音字时,有多大的概率会故意使用一个错误的声调来寻找候选字。 + 这可以模拟常见的声调错误,如 "shí" -> "shì"。 + word_replace_rate (float): 控制一个多字词语被整体替换为同音词的概率。 + 例如,“天气” -> “天气”。 + max_freq_diff (int): 允许的原始字与替换字之间的最大归一化频率差异。 + 用于确保替换字与原字的常用程度相似,避免用非常常见的字替换罕见字,反之亦然。 """ self.error_rate = error_rate self.min_freq = min_freq @@ -51,11 +58,12 @@ class ChineseTypoGenerator: def _load_or_create_char_frequency(self): """ 加载或创建汉字频率字典。 - 如果存在缓存文件 `depends-data/char_frequency.json`,则直接加载。 - 否则,通过解析 `jieba` 的词典文件来创建,并保存为缓存。 + 如果存在缓存文件 `depends-data/char_frequency.json`,则直接加载以提高启动速度。 + 否则,通过解析 `jieba` 的内置词典文件 `dict.txt` 来创建。 + 创建过程中,会统计每个汉字的累计频率,并进行归一化处理,然后保存为缓存文件。 Returns: - dict: 一个将汉字映射到其归一化频率的字典。 + dict: 一个将汉字映射到其归一化频率(0-1000范围)的字典。 """ cache_file = Path("depends-data/char_frequency.json") @@ -92,10 +100,13 @@ class ChineseTypoGenerator: def _create_pinyin_dict(): """ 创建从拼音到汉字的映射字典。 - 遍历常用汉字范围,为每个汉字生成带声调的拼音,并构建映射。 + 该方法会遍历 Unicode 中定义的常用汉字范围 (U+4E00 至 U+9FFF), + 为每个汉字生成带数字声调的拼音(例如 'hao3'),并构建一个从拼音到包含该拼音的所有汉字列表的映射。 + 这个字典是生成同音字和同音词的基础。 Returns: - defaultdict: 一个将拼音映射到汉字列表的字典。 + defaultdict: 一个将拼音字符串映射到汉字字符列表的字典。 + 例如: {'hao3': ['好', '郝', ...]} """ # 定义常用汉字的Unicode范围 chars = [chr(i) for i in range(0x4E00, 0x9FFF)] @@ -104,11 +115,10 @@ class ChineseTypoGenerator: # 为范围内的每个汉字建立拼音到汉字的映射 for char in chars: try: - # 获取带数字声调的拼音 (e.g., 'hao3') py = pinyin(char, style=Style.TONE3) - pinyin_dict[py].append(char) - except Exception: - # 忽略无法转换拼音的字符 + if py: + pinyin_dict[py].append(char) + except (IndexError, TypeError): continue return pinyin_dict @@ -144,23 +154,28 @@ class ChineseTypoGenerator: characters = list(sentence) result = [] for char in characters: - # 忽略所有非中文字符 if self._is_chinese_char(char): - # 获取带数字声调的拼音 - py = pinyin(char, style=Style.TONE3) - result.append((char, py)) + try: + py = pinyin(char, style=Style.TONE3) + if py: + result.append((char, py)) + except (IndexError, TypeError): + continue return result @staticmethod def _get_similar_tone_pinyin(py): """ 为一个给定的拼音生成一个声调错误的相似拼音。 + 例如,输入 'hao3',可能返回 'hao1'、'hao2' 或 'hao4'。 + 此函数用于模拟中文输入时常见的声调错误。 + 对于轻声(拼音末尾无数字声调),会随机分配一个声调。 Args: py (str): 带数字声调的原始拼音 (e.g., 'hao3')。 Returns: - str: 一个声调被随机改变的拼音。 + str: 一个声调被随机改变的新拼音。 """ # 检查拼音是否有效 if not py or len(py) < 1: @@ -186,11 +201,15 @@ class ChineseTypoGenerator: def _calculate_replacement_probability(self, orig_freq, target_freq): """ 根据原始字和目标替换字的频率差异,计算替换概率。 - 频率相近的字有更高的替换概率。 + 这个概率模型遵循以下原则: + 1. 如果目标字比原始字更常用,替换概率为 1.0(倾向于换成更常见的字)。 + 2. 如果频率差异超过 `max_freq_diff` 阈值,替换概率为 0.0。 + 3. 否则,使用指数衰减函数计算概率,频率差异越大,替换概率越低。 + 这使得替换更倾向于选择频率相近的字。 Args: - orig_freq (float): 原始字的频率。 - target_freq (float): 目标替换字的频率。 + orig_freq (float): 原始字的归一化频率。 + target_freq (float): 目标替换字的归一化频率。 Returns: float: 替换概率,介于 0.0 和 1.0 之间。 @@ -210,14 +229,19 @@ class ChineseTypoGenerator: def _get_similar_frequency_chars(self, char, py, num_candidates=5): """ 获取与给定汉字发音相似且频率相近的候选替换字。 + 此方法首先根据 `tone_error_rate` 决定是否寻找声调错误的同音字, + 然后合并声调正确的同音字。接着,根据字频进行过滤和排序: + 1. 移除原始字本身和频率低于 `min_freq` 的字。 + 2. 计算每个候选字的替换概率。 + 3. 按替换概率降序排序,并返回前 `num_candidates` 个候选字。 Args: char (str): 原始汉字。 py (str): 原始汉字的拼音。 - num_candidates (int): 返回的候选字数量。 + num_candidates (int): 返回的候选字数量上限。 Returns: - list or None: 一个包含候选替换字的列表,如果没有找到则返回 None。 + list or None: 一个包含候选替换字的列表,如果没有找到合适的候选字则返回 None。 """ homophones = [] @@ -254,7 +278,6 @@ class ChineseTypoGenerator: if not candidates_with_prob: return None - # 根据替换概率从高到低排序 candidates_with_prob.sort(key=lambda x: x, reverse=True) # 返回概率最高的几个候选字 @@ -269,9 +292,9 @@ class ChineseTypoGenerator: word (str): 输入的词语。 Returns: - list: 包含每个汉字拼音的列表。 + List[str]: 包含每个汉字拼音的列表。 """ - return [py for py in pinyin(word, style=Style.TONE3)] + return ["".join(p) for p in pinyin(word, style=Style.TONE3)] @staticmethod def _segment_sentence(sentence): @@ -288,14 +311,17 @@ class ChineseTypoGenerator: def _get_word_homophones(self, word): """ - 获取一个词语的同音词。 - 只返回在jieba词典中存在且频率较高的有意义词语。 + 获取一个词语的所有同音词。 + 该方法首先获取词语中每个字的拼音,然后为每个字找到所有同音字。 + 接着,使用 `itertools.product` 生成所有可能的同音字组合。 + 最后,过滤掉原始词本身,并只保留在 `jieba` 词典中存在的、有意义的词语。 + 这可以有效避免生成无意义的同音词组合。 Args: word (str): 原始词语。 Returns: - list: 一个包含同音词的列表。 + List[str]: 一个包含所有有效同音词的列表。 """ if len(word) <= 1: return [] @@ -310,45 +336,28 @@ class ChineseTypoGenerator: return [] # 如果某个字没有同音字,则无法构成同音词 candidates.append(chars) - # 生成所有可能的同音字组合 - import itertools - all_combinations = itertools.product(*candidates) + homophones = [ + "".join(combo) + for combo in all_combinations + if ("".join(combo) != word and "".join(combo) in jieba.dt.FREQ) + ] - # 加载jieba词典以验证组合出的词是否为有效词语 - dict_path = os.path.join(os.path.dirname(jieba.__file__), "dict.txt") - valid_words = {} - with open(dict_path, "r", encoding="utf-8") as f: - for line in f: - parts = line.strip().split() - if len(parts) >= 2: - valid_words[parts] = float(parts[0][1]) - - original_word_freq = valid_words.get(word, 0) - # 设置一个最小词频阈值,过滤掉非常生僻的词 - min_word_freq = original_word_freq * 0.1 - - homophones = [] - for combo in all_combinations: - new_word = "".join(combo) - # 检查新词是否为有效词语且与原词不同 - if new_word != word and new_word in valid_words: - new_word_freq = valid_words[new_word] - if new_word_freq >= min_word_freq: - # 计算综合评分,结合词频和平均字频 - char_avg_freq = sum(self.char_frequency.get(c, 0) for c in new_word) / len(new_word) - combined_score = new_word_freq * 0.7 + char_avg_freq * 0.3 - if combined_score >= self.min_freq: - homophones.append((new_word, combined_score)) - - # 按综合分数排序并返回前5个结果 - sorted_homophones = sorted(homophones, key=lambda x: x, reverse=True) - return [w for w, _ in sorted_homophones[:5]] + return homophones def create_typo_sentence(self, sentence): """ 为输入句子生成一个包含错别字的版本。 - 该方法会先对句子进行分词,然后根据概率进行整词替换或单字替换。 + 这是核心的错别字生成方法,其流程如下: + 1. 使用 jieba 对输入句子进行分词。 + 2. 遍历每个词语: + a. 如果词语长度大于1,根据 `word_replace_rate` 概率尝试进行整词替换。 + 如果找到了合适的同音词,则替换并跳过后续步骤。 + b. 如果不进行整词替换,则遍历词语中的每个汉字。 + c. 对每个汉字,调用 `_char_replace` 方法,根据 `error_rate` 和词语长度调整后的概率, + 决定是否进行单字替换。 + 3. 将处理后的词语拼接成最终的错别字句子。 + 4. 从所有发生的替换中,随机选择一个作为修正建议返回。 Args: sentence (str): 原始中文句子。 @@ -357,7 +366,7 @@ class ChineseTypoGenerator: tuple: 包含三个元素的元组: - original_sentence (str): 原始句子。 - typo_sentence (str): 包含错别字的句子。 - - correction_suggestion (str or None): 一个随机的修正建议(可能是正确的字或词),或 None。 + - correction_suggestion (Optional[tuple(str, str)]): 一个随机的修正建议,格式为 (错字/词, 正确字/词),或 None。 """ result = [] typo_info = [] # 用于调试,记录详细的替换信息 @@ -397,44 +406,56 @@ class ChineseTypoGenerator: word_typos.append((typo_word, word)) continue - # 步骤2: 如果不进行整词替换,则对词中的每个字进行单字替换 - new_word = [] - for char, py in zip(word, word_pinyin, strict=False): - # 词语越长,其中单个字被替换的概率越低 - char_error_rate = self.error_rate * (0.7 ** (len(word) - 1)) - if random.random() < char_error_rate: - similar_chars = self._get_similar_frequency_chars(char, py) - if similar_chars: - typo_char = random.choice(similar_chars) - orig_freq = self.char_frequency.get(char, 0) - typo_freq = self.char_frequency.get(typo_char, 0) - # 根据频率计算最终是否替换 - if random.random() < self._calculate_replacement_probability(orig_freq, typo_freq): - new_word.append(typo_char) - typo_py = pinyin(typo_char, style=Style.TONE3) - typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq)) - char_typos.append((typo_char, char)) - continue - # 如果不替换,则保留原字 - new_word.append(char) - - result.append("".join(new_word)) - - # 步骤3: 生成修正建议 - correction_suggestion = None - # 有50%的概率提供一个修正建议 - if random.random() < 0.5: - # 优先从整词错误中选择 - if word_typos: - _, correct_word = random.choice(word_typos) - correction_suggestion = correct_word - # 其次从单字错误中选择 - elif char_typos: - _, correct_char = random.choice(char_typos) - correction_suggestion = correct_char + new_word = "".join( + self._char_replace(char, py, len(word), typo_info, char_typos) for char, py in zip(word, word_pinyin) + ) + result.append(new_word) + all_typos = word_typos + char_typos + correction_suggestion = random.choice(all_typos) if all_typos and random.random() < 0.5 else None return sentence, "".join(result), correction_suggestion + def _char_replace(self, char, py, word_len, typo_info, char_typos): + """ + 根据概率替换单个汉字。 + 这个内部方法处理单个汉字的替换逻辑。 + + Args: + char (str): 要处理的原始汉字。 + py (str): 原始汉字的拼音。 + word_len (int): 原始汉字所在的词语的长度。 + typo_info (list): 用于记录详细调试信息的列表。 + char_typos (list): 用于记录 (错字, 正确字) 的列表。 + + Returns: + str: 替换后的汉字(可能是原汉字,也可能是错别字)。 + """ + # 根据词语长度调整错误率:词语越长,单个字出错的概率越低。 + # 这是一个启发式规则,模拟人们在输入长词时更不容易打错单个字。 + char_error_rate = self.error_rate * (0.7 ** (word_len - 1)) + if random.random() >= char_error_rate: + return char + + # 获取发音和频率都相似的候选错别字 + similar_chars = self._get_similar_frequency_chars(char, py) + if not similar_chars: + return char + + # 从候选列表中随机选择一个 + typo_char = random.choice(similar_chars) + orig_freq = self.char_frequency.get(char, 0) + typo_freq = self.char_frequency.get(typo_char, 0) + + # 根据频率差异再次进行概率判断,决定是否执行替换 + if random.random() >= self._calculate_replacement_probability(orig_freq, typo_freq): + return char + + # 执行替换,并记录相关信息 + typo_py = pinyin(typo_char, style=Style.TONE3) + typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq)) + char_typos.append((typo_char, char)) + return typo_char + @staticmethod def format_typo_info(typo_info): """ diff --git a/src/plugin_system/apis/generator_api.py b/src/plugin_system/apis/generator_api.py index 8d8d04eb3..7f3074a81 100644 --- a/src/plugin_system/apis/generator_api.py +++ b/src/plugin_system/apis/generator_api.py @@ -239,8 +239,6 @@ def process_human_text( enable_splitter: 是否启用消息分割器 enable_chinese_typo: 是否启用错字生成器 """ - if isinstance(content, list): - content = "".join(map(str, content)) if not isinstance(content, str): raise ValueError("content 必须是字符串类型") try: From e848f89c59c9b285661cffd5fb1b775861691409 Mon Sep 17 00:00:00 2001 From: minecraft1024a Date: Sat, 6 Sep 2025 20:16:49 +0800 Subject: [PATCH 3/4] =?UTF-8?q?Revert=20"refactor(chat):=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E9=94=99=E5=88=AB=E5=AD=97=E7=94=9F=E6=88=90=E5=99=A8?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E4=B8=8E=E6=96=87=E6=A1=A3"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit ceee3c1fbff075e093f1dd53c7b25df38dea0f78. --- src/chat/utils/typo_generator.py | 223 +++++++++++------------- src/plugin_system/apis/generator_api.py | 2 + 2 files changed, 103 insertions(+), 122 deletions(-) diff --git a/src/chat/utils/typo_generator.py b/src/chat/utils/typo_generator.py index 1f2f9c346..c23c4c319 100644 --- a/src/chat/utils/typo_generator.py +++ b/src/chat/utils/typo_generator.py @@ -2,14 +2,12 @@ 错别字生成器 - 基于拼音和字频的中文错别字生成工具 """ -import itertools +import orjson import math import os import random import time - import jieba -import orjson from collections import defaultdict from pathlib import Path @@ -32,16 +30,11 @@ class ChineseTypoGenerator: 初始化错别字生成器。 Args: - error_rate (float): 控制单个汉字被替换为错别字的基础概率。 - 这个概率会根据词语长度进行调整,词语越长,单个字出错的概率越低。 - min_freq (int): 候选替换字的最小词频阈值。一个汉字的频率低于此值时, - 它不会被选为替换候选字,以避免生成过于生僻的错别字。 - tone_error_rate (float): 在寻找同音字时,有多大的概率会故意使用一个错误的声调来寻找候选字。 - 这可以模拟常见的声调错误,如 "shí" -> "shì"。 - word_replace_rate (float): 控制一个多字词语被整体替换为同音词的概率。 - 例如,“天气” -> “天气”。 - max_freq_diff (int): 允许的原始字与替换字之间的最大归一化频率差异。 - 用于确保替换字与原字的常用程度相似,避免用非常常见的字替换罕见字,反之亦然。 + error_rate (float): 单个汉字被替换为同音字的概率。 + min_freq (int): 候选替换字的最小词频阈值,低于此阈值的字将被忽略。 + tone_error_rate (float): 在选择同音字时,使用错误声调的概率。 + word_replace_rate (float): 整个词语被替换为同音词的概率。 + max_freq_diff (int): 允许的原始字与替换字之间的最大频率差异。 """ self.error_rate = error_rate self.min_freq = min_freq @@ -58,12 +51,11 @@ class ChineseTypoGenerator: def _load_or_create_char_frequency(self): """ 加载或创建汉字频率字典。 - 如果存在缓存文件 `depends-data/char_frequency.json`,则直接加载以提高启动速度。 - 否则,通过解析 `jieba` 的内置词典文件 `dict.txt` 来创建。 - 创建过程中,会统计每个汉字的累计频率,并进行归一化处理,然后保存为缓存文件。 + 如果存在缓存文件 `depends-data/char_frequency.json`,则直接加载。 + 否则,通过解析 `jieba` 的词典文件来创建,并保存为缓存。 Returns: - dict: 一个将汉字映射到其归一化频率(0-1000范围)的字典。 + dict: 一个将汉字映射到其归一化频率的字典。 """ cache_file = Path("depends-data/char_frequency.json") @@ -100,13 +92,10 @@ class ChineseTypoGenerator: def _create_pinyin_dict(): """ 创建从拼音到汉字的映射字典。 - 该方法会遍历 Unicode 中定义的常用汉字范围 (U+4E00 至 U+9FFF), - 为每个汉字生成带数字声调的拼音(例如 'hao3'),并构建一个从拼音到包含该拼音的所有汉字列表的映射。 - 这个字典是生成同音字和同音词的基础。 + 遍历常用汉字范围,为每个汉字生成带声调的拼音,并构建映射。 Returns: - defaultdict: 一个将拼音字符串映射到汉字字符列表的字典。 - 例如: {'hao3': ['好', '郝', ...]} + defaultdict: 一个将拼音映射到汉字列表的字典。 """ # 定义常用汉字的Unicode范围 chars = [chr(i) for i in range(0x4E00, 0x9FFF)] @@ -115,10 +104,11 @@ class ChineseTypoGenerator: # 为范围内的每个汉字建立拼音到汉字的映射 for char in chars: try: + # 获取带数字声调的拼音 (e.g., 'hao3') py = pinyin(char, style=Style.TONE3) - if py: - pinyin_dict[py].append(char) - except (IndexError, TypeError): + pinyin_dict[py].append(char) + except Exception: + # 忽略无法转换拼音的字符 continue return pinyin_dict @@ -154,28 +144,23 @@ class ChineseTypoGenerator: characters = list(sentence) result = [] for char in characters: + # 忽略所有非中文字符 if self._is_chinese_char(char): - try: - py = pinyin(char, style=Style.TONE3) - if py: - result.append((char, py)) - except (IndexError, TypeError): - continue + # 获取带数字声调的拼音 + py = pinyin(char, style=Style.TONE3) + result.append((char, py)) return result @staticmethod def _get_similar_tone_pinyin(py): """ 为一个给定的拼音生成一个声调错误的相似拼音。 - 例如,输入 'hao3',可能返回 'hao1'、'hao2' 或 'hao4'。 - 此函数用于模拟中文输入时常见的声调错误。 - 对于轻声(拼音末尾无数字声调),会随机分配一个声调。 Args: py (str): 带数字声调的原始拼音 (e.g., 'hao3')。 Returns: - str: 一个声调被随机改变的新拼音。 + str: 一个声调被随机改变的拼音。 """ # 检查拼音是否有效 if not py or len(py) < 1: @@ -201,15 +186,11 @@ class ChineseTypoGenerator: def _calculate_replacement_probability(self, orig_freq, target_freq): """ 根据原始字和目标替换字的频率差异,计算替换概率。 - 这个概率模型遵循以下原则: - 1. 如果目标字比原始字更常用,替换概率为 1.0(倾向于换成更常见的字)。 - 2. 如果频率差异超过 `max_freq_diff` 阈值,替换概率为 0.0。 - 3. 否则,使用指数衰减函数计算概率,频率差异越大,替换概率越低。 - 这使得替换更倾向于选择频率相近的字。 + 频率相近的字有更高的替换概率。 Args: - orig_freq (float): 原始字的归一化频率。 - target_freq (float): 目标替换字的归一化频率。 + orig_freq (float): 原始字的频率。 + target_freq (float): 目标替换字的频率。 Returns: float: 替换概率,介于 0.0 和 1.0 之间。 @@ -229,19 +210,14 @@ class ChineseTypoGenerator: def _get_similar_frequency_chars(self, char, py, num_candidates=5): """ 获取与给定汉字发音相似且频率相近的候选替换字。 - 此方法首先根据 `tone_error_rate` 决定是否寻找声调错误的同音字, - 然后合并声调正确的同音字。接着,根据字频进行过滤和排序: - 1. 移除原始字本身和频率低于 `min_freq` 的字。 - 2. 计算每个候选字的替换概率。 - 3. 按替换概率降序排序,并返回前 `num_candidates` 个候选字。 Args: char (str): 原始汉字。 py (str): 原始汉字的拼音。 - num_candidates (int): 返回的候选字数量上限。 + num_candidates (int): 返回的候选字数量。 Returns: - list or None: 一个包含候选替换字的列表,如果没有找到合适的候选字则返回 None。 + list or None: 一个包含候选替换字的列表,如果没有找到则返回 None。 """ homophones = [] @@ -278,6 +254,7 @@ class ChineseTypoGenerator: if not candidates_with_prob: return None + # 根据替换概率从高到低排序 candidates_with_prob.sort(key=lambda x: x, reverse=True) # 返回概率最高的几个候选字 @@ -292,9 +269,9 @@ class ChineseTypoGenerator: word (str): 输入的词语。 Returns: - List[str]: 包含每个汉字拼音的列表。 + list: 包含每个汉字拼音的列表。 """ - return ["".join(p) for p in pinyin(word, style=Style.TONE3)] + return [py for py in pinyin(word, style=Style.TONE3)] @staticmethod def _segment_sentence(sentence): @@ -311,17 +288,14 @@ class ChineseTypoGenerator: def _get_word_homophones(self, word): """ - 获取一个词语的所有同音词。 - 该方法首先获取词语中每个字的拼音,然后为每个字找到所有同音字。 - 接着,使用 `itertools.product` 生成所有可能的同音字组合。 - 最后,过滤掉原始词本身,并只保留在 `jieba` 词典中存在的、有意义的词语。 - 这可以有效避免生成无意义的同音词组合。 + 获取一个词语的同音词。 + 只返回在jieba词典中存在且频率较高的有意义词语。 Args: word (str): 原始词语。 Returns: - List[str]: 一个包含所有有效同音词的列表。 + list: 一个包含同音词的列表。 """ if len(word) <= 1: return [] @@ -336,28 +310,45 @@ class ChineseTypoGenerator: return [] # 如果某个字没有同音字,则无法构成同音词 candidates.append(chars) - all_combinations = itertools.product(*candidates) - homophones = [ - "".join(combo) - for combo in all_combinations - if ("".join(combo) != word and "".join(combo) in jieba.dt.FREQ) - ] + # 生成所有可能的同音字组合 + import itertools - return homophones + all_combinations = itertools.product(*candidates) + + # 加载jieba词典以验证组合出的词是否为有效词语 + dict_path = os.path.join(os.path.dirname(jieba.__file__), "dict.txt") + valid_words = {} + with open(dict_path, "r", encoding="utf-8") as f: + for line in f: + parts = line.strip().split() + if len(parts) >= 2: + valid_words[parts] = float(parts[0][1]) + + original_word_freq = valid_words.get(word, 0) + # 设置一个最小词频阈值,过滤掉非常生僻的词 + min_word_freq = original_word_freq * 0.1 + + homophones = [] + for combo in all_combinations: + new_word = "".join(combo) + # 检查新词是否为有效词语且与原词不同 + if new_word != word and new_word in valid_words: + new_word_freq = valid_words[new_word] + if new_word_freq >= min_word_freq: + # 计算综合评分,结合词频和平均字频 + char_avg_freq = sum(self.char_frequency.get(c, 0) for c in new_word) / len(new_word) + combined_score = new_word_freq * 0.7 + char_avg_freq * 0.3 + if combined_score >= self.min_freq: + homophones.append((new_word, combined_score)) + + # 按综合分数排序并返回前5个结果 + sorted_homophones = sorted(homophones, key=lambda x: x, reverse=True) + return [w for w, _ in sorted_homophones[:5]] def create_typo_sentence(self, sentence): """ 为输入句子生成一个包含错别字的版本。 - 这是核心的错别字生成方法,其流程如下: - 1. 使用 jieba 对输入句子进行分词。 - 2. 遍历每个词语: - a. 如果词语长度大于1,根据 `word_replace_rate` 概率尝试进行整词替换。 - 如果找到了合适的同音词,则替换并跳过后续步骤。 - b. 如果不进行整词替换,则遍历词语中的每个汉字。 - c. 对每个汉字,调用 `_char_replace` 方法,根据 `error_rate` 和词语长度调整后的概率, - 决定是否进行单字替换。 - 3. 将处理后的词语拼接成最终的错别字句子。 - 4. 从所有发生的替换中,随机选择一个作为修正建议返回。 + 该方法会先对句子进行分词,然后根据概率进行整词替换或单字替换。 Args: sentence (str): 原始中文句子。 @@ -366,7 +357,7 @@ class ChineseTypoGenerator: tuple: 包含三个元素的元组: - original_sentence (str): 原始句子。 - typo_sentence (str): 包含错别字的句子。 - - correction_suggestion (Optional[tuple(str, str)]): 一个随机的修正建议,格式为 (错字/词, 正确字/词),或 None。 + - correction_suggestion (str or None): 一个随机的修正建议(可能是正确的字或词),或 None。 """ result = [] typo_info = [] # 用于调试,记录详细的替换信息 @@ -406,56 +397,44 @@ class ChineseTypoGenerator: word_typos.append((typo_word, word)) continue - new_word = "".join( - self._char_replace(char, py, len(word), typo_info, char_typos) for char, py in zip(word, word_pinyin) - ) - result.append(new_word) + # 步骤2: 如果不进行整词替换,则对词中的每个字进行单字替换 + new_word = [] + for char, py in zip(word, word_pinyin, strict=False): + # 词语越长,其中单个字被替换的概率越低 + char_error_rate = self.error_rate * (0.7 ** (len(word) - 1)) + if random.random() < char_error_rate: + similar_chars = self._get_similar_frequency_chars(char, py) + if similar_chars: + typo_char = random.choice(similar_chars) + orig_freq = self.char_frequency.get(char, 0) + typo_freq = self.char_frequency.get(typo_char, 0) + # 根据频率计算最终是否替换 + if random.random() < self._calculate_replacement_probability(orig_freq, typo_freq): + new_word.append(typo_char) + typo_py = pinyin(typo_char, style=Style.TONE3) + typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq)) + char_typos.append((typo_char, char)) + continue + # 如果不替换,则保留原字 + new_word.append(char) + + result.append("".join(new_word)) + + # 步骤3: 生成修正建议 + correction_suggestion = None + # 有50%的概率提供一个修正建议 + if random.random() < 0.5: + # 优先从整词错误中选择 + if word_typos: + _, correct_word = random.choice(word_typos) + correction_suggestion = correct_word + # 其次从单字错误中选择 + elif char_typos: + _, correct_char = random.choice(char_typos) + correction_suggestion = correct_char - all_typos = word_typos + char_typos - correction_suggestion = random.choice(all_typos) if all_typos and random.random() < 0.5 else None return sentence, "".join(result), correction_suggestion - def _char_replace(self, char, py, word_len, typo_info, char_typos): - """ - 根据概率替换单个汉字。 - 这个内部方法处理单个汉字的替换逻辑。 - - Args: - char (str): 要处理的原始汉字。 - py (str): 原始汉字的拼音。 - word_len (int): 原始汉字所在的词语的长度。 - typo_info (list): 用于记录详细调试信息的列表。 - char_typos (list): 用于记录 (错字, 正确字) 的列表。 - - Returns: - str: 替换后的汉字(可能是原汉字,也可能是错别字)。 - """ - # 根据词语长度调整错误率:词语越长,单个字出错的概率越低。 - # 这是一个启发式规则,模拟人们在输入长词时更不容易打错单个字。 - char_error_rate = self.error_rate * (0.7 ** (word_len - 1)) - if random.random() >= char_error_rate: - return char - - # 获取发音和频率都相似的候选错别字 - similar_chars = self._get_similar_frequency_chars(char, py) - if not similar_chars: - return char - - # 从候选列表中随机选择一个 - typo_char = random.choice(similar_chars) - orig_freq = self.char_frequency.get(char, 0) - typo_freq = self.char_frequency.get(typo_char, 0) - - # 根据频率差异再次进行概率判断,决定是否执行替换 - if random.random() >= self._calculate_replacement_probability(orig_freq, typo_freq): - return char - - # 执行替换,并记录相关信息 - typo_py = pinyin(typo_char, style=Style.TONE3) - typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq)) - char_typos.append((typo_char, char)) - return typo_char - @staticmethod def format_typo_info(typo_info): """ diff --git a/src/plugin_system/apis/generator_api.py b/src/plugin_system/apis/generator_api.py index 7f3074a81..8d8d04eb3 100644 --- a/src/plugin_system/apis/generator_api.py +++ b/src/plugin_system/apis/generator_api.py @@ -239,6 +239,8 @@ def process_human_text( enable_splitter: 是否启用消息分割器 enable_chinese_typo: 是否启用错字生成器 """ + if isinstance(content, list): + content = "".join(map(str, content)) if not isinstance(content, str): raise ValueError("content 必须是字符串类型") try: From c097b5d00b27fb7f6ae2be76e7c5a79019259454 Mon Sep 17 00:00:00 2001 From: minecraft1024a Date: Sat, 6 Sep 2025 20:23:24 +0800 Subject: [PATCH 4/4] =?UTF-8?q?feat(config):=20=E6=94=AF=E6=8C=81API?= =?UTF-8?q?=E5=AF=86=E9=92=A5=E5=88=97=E8=A1=A8=E8=BD=AE=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将`api_key`类型从`str`扩展为`Union[str, List[str]]`,允许用户配置多个API密钥。 - 新增`get_api_key`方法,通过线程安全的方式实现API密钥的轮询使用,提高API请求的稳定性和可用性。 - 更新了`api_key`的验证逻辑,以同时支持字符串和字符串列表两种格式。 - 相应地更新了配置文件模板,以示例新的密钥列表配置方式。 --- src/config/api_ada_configs.py | 32 ++++++++++++++++++++++++----- template/model_config_template.toml | 4 ++-- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/config/api_ada_configs.py b/src/config/api_ada_configs.py index 5e53eec4b..cc25d0646 100644 --- a/src/config/api_ada_configs.py +++ b/src/config/api_ada_configs.py @@ -1,5 +1,6 @@ -from typing import List, Dict, Any, Literal +from typing import List, Dict, Any, Literal, Union from pydantic import Field, field_validator +from threading import Lock from src.config.config_base import ValidatedConfigBase @@ -9,7 +10,7 @@ class APIProvider(ValidatedConfigBase): name: str = Field(..., min_length=1, description="API提供商名称") base_url: str = Field(..., description="API基础URL") - api_key: str = Field(..., min_length=1, description="API密钥") + api_key: Union[str, List[str]] = Field(..., min_length=1, description="API密钥,支持单个密钥或密钥列表轮询") client_type: Literal["openai", "gemini", "aiohttp_gemini"] = Field( default="openai", description="客户端类型(如openai/google等,默认为openai)" ) @@ -33,12 +34,33 @@ class APIProvider(ValidatedConfigBase): @classmethod def validate_api_key(cls, v): """验证API密钥不能为空""" - if not v or not v.strip(): - raise ValueError("API密钥不能为空") + if isinstance(v, str): + if not v.strip(): + raise ValueError("API密钥不能为空") + elif isinstance(v, list): + if not v: + raise ValueError("API密钥列表不能为空") + for key in v: + if not isinstance(key, str) or not key.strip(): + raise ValueError("API密钥列表中的密钥不能为空") + else: + raise ValueError("API密钥必须是字符串或字符串列表") return v + def __init__(self, **data): + super().__init__(**data) + self._api_key_lock = Lock() + self._api_key_index = 0 + def get_api_key(self) -> str: - return self.api_key + with self._api_key_lock: + if isinstance(self.api_key, str): + return self.api_key + if not self.api_key: + raise ValueError("API密钥列表为空") + key = self.api_key[self._api_key_index] + self._api_key_index = (self._api_key_index + 1) % len(self.api_key) + return key class ModelInfo(ValidatedConfigBase): diff --git a/template/model_config_template.toml b/template/model_config_template.toml index fab3ee509..c5f2a2947 100644 --- a/template/model_config_template.toml +++ b/template/model_config_template.toml @@ -6,7 +6,7 @@ version = "1.3.1" [[api_providers]] # API服务提供商(可以配置多个) name = "DeepSeek" # API服务商名称(可随意命名,在models的api-provider中需使用这个命名) base_url = "https://api.deepseek.com/v1" # API服务商的BaseURL -api_key = "your-api-key-here" # API密钥(请替换为实际的API密钥) +api_key = ["your-api-key-here-1", "your-api-key-here-2"] # API密钥(支持单个密钥或密钥列表轮询) client_type = "openai" # 请求客户端(可选,默认值为"openai",使用gimini等Google系模型时请配置为"gemini") max_retry = 2 # 最大重试次数(单个模型API调用失败,最多重试的次数) timeout = 30 # API请求超时时间(单位:秒) @@ -24,7 +24,7 @@ retry_interval = 10 [[api_providers]] # 特殊:Google的Gimini使用特殊API,与OpenAI格式不兼容,需要配置client为"aiohttp_gemini" name = "Google" base_url = "https://api.google.com/v1" -api_key = "your-google-api-key-1" +api_key = ["your-google-api-key-1", "your-google-api-key-2"] client_type = "aiohttp_gemini" # 官方的gemini客户端现在已经死了 max_retry = 2 timeout = 30