This commit is contained in:
雅诺狐
2025-08-21 21:31:47 +08:00
28 changed files with 756 additions and 870 deletions

2
bot.py
View File

@@ -1,13 +1,11 @@
import asyncio
import hashlib
import os
import random
import sys
import time
import platform
import traceback
from pathlib import Path
from typing import List, Optional, Sequence
from dotenv import load_dotenv
from rich.traceback import install
from colorama import init, Fore

View File

@@ -17,7 +17,7 @@ from typing import Optional, Tuple, Dict, Any
from src.common.logger import get_logger
from src.config.config import global_config
from src.chat.message_receive.message import MessageRecv
from .types import DetectionResult, ProcessResult
from .types import ProcessResult
from .core import PromptInjectionDetector, MessageShield
from .processors import should_skip_injection_detection, initialize_skip_list, MessageProcessor
from .management import AntiInjectionStatistics, UserBanManager

View File

@@ -5,7 +5,6 @@
负责根据检测结果和配置决定如何处理消息
"""
from typing import Dict, List
from src.common.logger import get_logger
from ..types import DetectionResult

View File

@@ -5,7 +5,6 @@
负责根据检测结果和配置决定如何处理消息
"""
from typing import Dict, List
from src.common.logger import get_logger
from .types import DetectionResult

View File

@@ -6,7 +6,7 @@
"""
import datetime
from typing import Dict, Any, Optional
from typing import Dict, Any
from src.common.logger import get_logger
from src.common.database.sqlalchemy_models import AntiInjectionStats, get_db_session

View File

@@ -5,12 +5,9 @@ from typing import Optional, Dict, Any
from src.common.logger import get_logger
from src.config.config import global_config
from src.chat.utils.timer_calculator import Timer
from src.chat.planner_actions.planner import ActionPlanner
from src.chat.planner_actions.action_modifier import ActionModifier
from src.plugin_system.core import events_manager
from src.plugin_system.base.component_types import EventType, ChatMode
from src.mais4u.mai_think import mai_thinking_manager
from src.plugin_system.base.component_types import ChatMode
from src.mais4u.constant_s4u import ENABLE_S4U
from src.chat.chat_loop.hfc_utils import send_typing, stop_typing
from .hfc_context import HfcContext
@@ -130,6 +127,34 @@ class CycleProcessor:
return True
async def execute_plan(self, action_result: Dict[str, Any], target_message: Optional[Dict[str, Any]]):
"""
执行一个已经制定好的计划
"""
action_type = action_result.get("action_type", "error")
# 这里我们需要为执行计划创建一个新的循环追踪
cycle_timers, thinking_id = self.cycle_tracker.start_cycle(is_proactive=True)
loop_start_time = time.time()
if action_type == "reply":
# 主动思考不应该直接触发简单回复但为了逻辑完整性我们假设它会调用response_handler
# 注意:这里的 available_actions 和 plan_result 是缺失的,需要根据实际情况处理
await self._handle_reply_action(target_message, {}, None, loop_start_time, cycle_timers, thinking_id, {"action_result": action_result})
else:
await self._handle_other_actions(
action_type,
action_result.get("reasoning", ""),
action_result.get("action_data", {}),
action_result.get("is_parallel", False),
None,
target_message,
cycle_timers,
thinking_id,
{"action_result": action_result},
loop_start_time
)
async def _handle_reply_action(self, message_data, available_actions, gen_task, loop_start_time, cycle_timers, thinking_id, plan_result):
"""
处理回复类型的动作

View File

@@ -21,10 +21,13 @@ class CycleTracker:
"""
self.context = context
def start_cycle(self) -> Tuple[Dict[str, float], str]:
def start_cycle(self, is_proactive: bool = False) -> Tuple[Dict[str, float], str]:
"""
开始新的思考循环
Args:
is_proactive: 标记这个循环是否由主动思考发起
Returns:
tuple: (循环计时器字典, 思考ID字符串)
@@ -34,8 +37,11 @@ class CycleTracker:
- 生成唯一的思考ID
- 初始化循环计时器
"""
self.context.cycle_counter += 1
self.context.current_cycle_detail = CycleDetail(self.context.cycle_counter)
if not is_proactive:
self.context.cycle_counter += 1
cycle_id = self.context.cycle_counter if not is_proactive else f"{self.context.cycle_counter}.p"
self.context.current_cycle_detail = CycleDetail(cycle_id)
self.context.current_cycle_detail.thinking_id = f"tid{str(round(time.time(), 2))}"
cycle_timers = {}
return cycle_timers, self.context.current_cycle_detail.thinking_id

View File

@@ -5,7 +5,6 @@ from typing import Optional
from src.common.logger import get_logger
from src.config.config import global_config
from src.chat.message_receive.chat_stream import get_chat_manager
from src.person_info.relationship_builder_manager import relationship_builder_manager
from src.chat.express.expression_learner import expression_learner_manager
from src.plugin_system.base.component_types import ChatMode

View File

@@ -1,4 +1,4 @@
from typing import List, Optional, Dict, Any, TYPE_CHECKING
from typing import List, Optional, TYPE_CHECKING
import time
from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager
from src.person_info.relationship_builder_manager import RelationshipBuilder

View File

@@ -1,5 +1,5 @@
import time
from typing import Optional, Dict, Any
from typing import Optional, Dict, Any, Union
from src.config.config import global_config
from src.common.logger import get_logger
@@ -23,7 +23,7 @@ class CycleDetail:
- 提供序列化和转换功能
"""
def __init__(self, cycle_id: int):
def __init__(self, cycle_id: Union[int, str]):
"""
初始化循环详情记录

View File

@@ -1,7 +1,7 @@
import asyncio
import time
import traceback
from typing import Optional, Dict, Any, TYPE_CHECKING
from typing import Optional, TYPE_CHECKING
from src.common.logger import get_logger
from src.config.config import global_config
@@ -245,58 +245,23 @@ class ProactiveThinker:
Args:
silence_duration: 沉默持续时间(秒)
功能说明:
- 格式化沉默时间并记录触发日志
- 获取适当的思考提示模板
- 创建主动思考类型的消息数据
- 调用循环处理器执行思考和可能的回复
- 处理执行过程中的异常
"""
formatted_time = self._format_duration(silence_duration)
logger.info(f"{self.context.log_prefix} 触发主动思考,已沉默{formatted_time}")
try:
proactive_prompt = self._get_proactive_prompt(formatted_time)
# 直接调用 planner 的 PROACTIVE 模式
action_result_tuple, target_message = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE)
action_result = action_result_tuple.get("action_result")
thinking_message = {
"processed_plain_text": proactive_prompt,
"user_id": "system_proactive_thinking",
"user_platform": "system",
"timestamp": time.time(),
"message_type": "proactive_thinking",
"user_nickname": "系统主动思考",
"chat_info_platform": "system",
"message_id": f"proactive_{int(time.time())}",
}
logger.info(f"{self.context.log_prefix} 开始主动思考...")
await self.cycle_processor.observe(message_data=thinking_message)
logger.info(f"{self.context.log_prefix} 主动思考完成")
# 如果决策不是 do_nothing则执行
if action_result and action_result.get("action_type") != "do_nothing":
logger.info(f"{self.context.log_prefix} 主动思考决策: {action_result.get('action_type')}, 原因: {action_result.get('reasoning')}")
# 将决策结果交给 cycle_processor 的后续流程处理
await self.cycle_processor.execute_plan(action_result, target_message)
else:
logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默")
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考执行异常: {e}")
logger.error(traceback.format_exc())
def _get_proactive_prompt(self, formatted_time: str) -> str:
"""
获取主动思考的提示模板
Args:
formatted_time: 格式化后的沉默时间字符串
Returns:
str: 填充了时间信息的提示模板
功能说明:
- 优先使用自定义的提示模板(如果配置了)
- 根据聊天类型(群聊/私聊)选择默认模板
- 将格式化的时间信息填入模板
- 返回完整的主动思考提示文本
"""
if hasattr(global_config.chat, 'proactive_thinking_prompt_template') and global_config.chat.proactive_thinking_prompt_template.strip():
return global_config.chat.proactive_thinking_prompt_template.format(time=formatted_time)
chat_type = "group" if self.context.chat_stream and self.context.chat_stream.group_info else "private"
prompt_template = self.proactive_thinking_prompts.get(chat_type, self.proactive_thinking_prompts["group"])
return prompt_template.format(time=formatted_time)

View File

@@ -1,7 +1,7 @@
import time
import random
import traceback
from typing import Optional, Dict, Any, List, Tuple
from typing import Optional, Dict, Any, Tuple
from src.common.logger import get_logger
from src.config.config import global_config

View File

@@ -23,15 +23,16 @@ from src.plugin_system.base.component_types import ActionInfo, ChatMode, Compone
from src.plugin_system.core.component_registry import component_registry
from src.manager.schedule_manager import schedule_manager
from src.mood.mood_manager import mood_manager
from src.chat.memory_system.Hippocampus import hippocampus_manager
logger = get_logger("planner")
install(extra_lines=3)
def init_prompt():
Prompt(
"""
{schedule_block}
Prompt(
"""
{schedule_block}
{mood_block}
{time_block}
{identity_block}
@@ -55,6 +56,32 @@ def init_prompt():
"planner_prompt",
)
Prompt(
"""
# 主动思考决策
## 你的内部状态
{time_block}
{identity_block}
{schedule_block}
{mood_block}
## 长期记忆摘要
{long_term_memory_block}
## 任务
基于以上所有信息,分析当前情况,决定是否需要主动做些什么。
如果你认为不需要,就选择 'do_nothing'
## 可用动作
{action_options_text}
你必须从上面列出的可用action中选择一个。
请以严格的 JSON 格式输出,且仅包含 JSON 内容:
""",
"proactive_planner_prompt",
)
Prompt(
"""
动作:{action_name}
@@ -84,6 +111,78 @@ class ActionPlanner:
self.plan_retry_count = 0
self.max_plan_retries = 3
async def _get_long_term_memory_context(self) -> str:
"""
获取长期记忆上下文
"""
try:
# 1. 生成时间相关的关键词
now = datetime.now()
keywords = ["今天", "日程", "计划"]
if 5 <= now.hour < 12:
keywords.append("早上")
elif 12 <= now.hour < 18:
keywords.append("中午")
else:
keywords.append("晚上")
# TODO: 添加与聊天对象相关的关键词
# 2. 调用 hippocampus_manager 检索记忆
retrieved_memories = await hippocampus_manager.get_memory_from_topic(
valid_keywords=keywords,
max_memory_num=5,
max_memory_length=1
)
if not retrieved_memories:
return "最近没有什么特别的记忆。"
# 3. 格式化记忆
memory_statements = []
for topic, memory_item in retrieved_memories:
memory_statements.append(f"关于'{topic}', 你记得'{memory_item}'")
return " ".join(memory_statements)
except Exception as e:
logger.error(f"获取长期记忆时出错: {e}")
return "回忆时出现了一些问题。"
async def _build_action_options(self, current_available_actions: Dict[str, ActionInfo], mode: ChatMode, target_prompt: str = "") -> str:
"""
构建动作选项
"""
action_options_block = ""
if mode == ChatMode.PROACTIVE:
action_options_block += """动作do_nothing
动作描述:保持沉默,不主动发起任何动作或对话。
- 当你分析了所有信息后,觉得当前不是一个发起互动的好时机时
{{
"action": "do_nothing",
"reason":"决定保持沉默的具体原因"
}}
"""
for action_name, action_info in current_available_actions.items():
# TODO: 增加一个字段来判断action是否支持在PROACTIVE模式下使用
param_text = ""
if action_info.action_parameters:
param_text = "\n" + "\n".join(f' "{p_name}":"{p_desc}"' for p_name, p_desc in action_info.action_parameters.items())
require_text = "\n".join(f"- {req}" for req in action_info.action_require)
using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt")
action_options_block += using_action_prompt.format(
action_name=action_name,
action_description=action_info.description,
action_parameters=param_text,
action_require=require_text,
target_prompt=target_prompt,
)
return action_options_block
def find_message_by_id(self, message_id: str, message_id_list: list) -> Optional[Dict[str, Any]]:
# sourcery skip: use-next
"""
@@ -118,7 +217,7 @@ class ActionPlanner:
async def plan(
self, mode: ChatMode = ChatMode.FOCUS
) -> Tuple[Dict[str, Dict[str, Any] | str], Optional[Dict[str, Any]]]:
) -> Tuple[Dict[str, Any], Optional[Dict[str, Any]]]:
"""
规划器 (Planner): 使用LLM根据上下文决定做出什么动作。
"""
@@ -189,6 +288,11 @@ class ActionPlanner:
# 在FOCUS模式下非no_reply动作需要target_message_id
if mode == ChatMode.FOCUS and action != "no_reply":
if target_message_id := parsed_json.get("target_message_id"):
if isinstance(target_message_id, int):
target_message_id = str(target_message_id)
if isinstance(target_message_id, str) and not target_message_id.startswith('M'):
target_message_id = f"M{target_message_id}"
# 根据target_message_id查找原始消息
target_message = self.find_message_by_id(target_message_id, message_id_list)
# target_message = None
@@ -262,6 +366,40 @@ class ActionPlanner:
) -> tuple[str, list]: # sourcery skip: use-join
"""构建 Planner LLM 的提示词 (获取模板并填充数据)"""
try:
# --- 通用信息获取 ---
time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
bot_name = global_config.bot.nickname
bot_nickname = f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else ""
bot_core_personality = global_config.personality.personality_core
identity_block = f"你的名字是{bot_name}{bot_nickname},你{bot_core_personality}"
schedule_block = ""
if global_config.schedule.enable:
if current_activity := schedule_manager.get_current_activity():
schedule_block = f"你当前正在:{current_activity}"
mood_block = ""
if global_config.mood.enable_mood:
chat_mood = mood_manager.get_mood_by_chat_id(self.chat_id)
mood_block = f"你现在的心情是:{chat_mood.mood_state}"
# --- 根据模式构建不同的Prompt ---
if mode == ChatMode.PROACTIVE:
long_term_memory_block = await self._get_long_term_memory_context()
action_options_text = await self._build_action_options(current_available_actions, mode)
prompt_template = await global_prompt_manager.get_prompt_async("proactive_planner_prompt")
prompt = prompt_template.format(
time_block=time_block,
identity_block=identity_block,
schedule_block=schedule_block,
mood_block=mood_block,
long_term_memory_block=long_term_memory_block,
action_options_text=action_options_text,
)
return prompt, []
# --- FOCUS 和 NORMAL 模式的逻辑 ---
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=self.chat_id,
timestamp=time.time(),
@@ -283,16 +421,11 @@ class ActionPlanner:
limit=5,
)
actions_before_now_block = build_readable_actions(
actions=actions_before_now,
)
actions_before_now_block = build_readable_actions(actions=actions_before_now)
actions_before_now_block = f"你刚刚选择并执行过的action是\n{actions_before_now_block}"
# 注意不在这里更新last_obs_time_mark应该在plan成功后再更新避免异常情况下错误更新时间戳
self.last_obs_time_mark = time.time()
if mode == ChatMode.FOCUS:
mentioned_bonus = ""
if global_config.chat.mentioned_bot_inevitable_reply:
@@ -318,7 +451,7 @@ class ActionPlanner:
}}
"""
else:
else: # NORMAL Mode
by_what = "聊天内容和用户的最新消息"
target_prompt = ""
no_action_block = """重要说明:
@@ -326,67 +459,14 @@ class ActionPlanner:
- 其他action表示在普通回复的基础上执行相应的额外动作"""
chat_context_description = "你现在正在一个群聊中"
chat_target_name = None # Only relevant for private
if not is_group_chat and chat_target_info:
chat_target_name = (
chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方"
)
chat_target_name = chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方"
chat_context_description = f"你正在和 {chat_target_name} 私聊"
action_options_block = ""
# 先定义 schedule_block 和 mood_block这些在主模板中需要使用
schedule_block = ""
if global_config.schedule.enable:
current_activity = schedule_manager.get_current_activity()
if current_activity:
schedule_block = f"你当前正在:{current_activity}"
mood_block = ""
if global_config.mood.enable_mood:
chat_mood = mood_manager.get_mood_by_chat_id(self.chat_id)
mood_block = f"你现在的心情是:{chat_mood.mood_state}"
for using_actions_name, using_actions_info in current_available_actions.items():
if using_actions_info.action_parameters:
param_text = "\n"
for param_name, param_description in using_actions_info.action_parameters.items():
param_text += f' "{param_name}":"{param_description}"\n'
param_text = param_text.rstrip("\n")
else:
param_text = ""
require_text = ""
for require_item in using_actions_info.action_require:
require_text += f"- {require_item}\n"
require_text = require_text.rstrip("\n")
using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt")
using_action_prompt = using_action_prompt.format(
schedule_block=schedule_block,
mood_block=mood_block,
action_name=using_actions_name,
action_description=using_actions_info.description,
action_parameters=param_text,
action_require=require_text,
target_prompt=target_prompt,
)
action_options_block += using_action_prompt
action_options_block = await self._build_action_options(current_available_actions, mode, target_prompt)
moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。"
time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
bot_name = global_config.bot.nickname
if global_config.bot.alias_names:
bot_nickname = f",也有人叫你{','.join(global_config.bot.alias_names)}"
else:
bot_nickname = ""
bot_core_personality = global_config.personality.personality_core
identity_block = f"你的名字是{bot_name}{bot_nickname},你{bot_core_personality}"
# 处理自定义提示词
custom_prompt_block = ""
if global_config.custom_prompt.planner_custom_prompt_enable and global_config.custom_prompt.planner_custom_prompt_content:
custom_prompt_block = global_config.custom_prompt.planner_custom_prompt_content

View File

@@ -1,6 +1,5 @@
# mmc/src/common/database/monthly_plan_db.py
import datetime
from typing import List
from src.common.database.sqlalchemy_models import MonthlyPlan, get_db_session
from src.common.logger import get_logger

View File

@@ -83,6 +83,7 @@ class TaskConfig(ValidatedConfigBase):
max_tokens: int = Field(default=800, description="任务最大输出token数")
temperature: float = Field(default=0.7, description="模型温度")
concurrency_count: int = Field(default=1, description="并发请求数量")
anti_truncation: bool = Field(default=False, description="是否启用反截断功能,防止模型输出被截断")
@field_validator('model_list')
@classmethod

View File

@@ -1,7 +1,7 @@
import asyncio
import io
import base64
from typing import Callable, AsyncIterator, Optional, Coroutine, Any, List, Dict, Union
from typing import Callable, AsyncIterator, Optional, Coroutine, Any, List, Dict
import google.generativeai as genai
from google.generativeai.types import (

View File

@@ -132,10 +132,21 @@ class LLMRequest:
# 内容混淆过滤指令
self.noise_instruction = """**【核心过滤规则】**
在交互过程中你会在用户的输入内容中看到一些无意义的、由字母和数字组成的随机字符串。这些字符串是API传输过程中的技术伪装信息【不是】用户真实意图的一部分。
在交互过程中,你会在用户的输入内容中看到一些无意义的、由字母和数字组成的随机字符串。这些字符串是API传输过程中的技术伪装信息,【不是】用户真实意图的一部分。
你的任务是【完全并彻底地忽略】这些随机字符串。
**【严禁】** 将这些字符串写入你的回复、对它们进行任何形式的解读、或将它们作为故事的一部分进行模拟。当看到这些字符串时请将其视为空白或直接跳过并根据用户其他有意义的输入如有或当前故事情节来继续你的叙事。此规则为最高优先级。
**【严禁】** 将这些字符串写入你的回复、对它们进行任何形式的解读、或将它们作为故事的一部分进行模拟。当看到这些字符串时,请将其视为空白或直接跳过,并根据用户其他有意义的输入(如有)或当前故事情节来继续你的叙事。此规则为最高优先级。
"""
# 反截断指令
self.anti_truncation_instruction = """
**【输出完成信令】**
这是一个非常重要的指令,请务必遵守。在你的回复内容完全结束后,请务必在最后另起一行,只写 `[done]` 作为结束标志。
例如:
<你的回复内容>
[done]
这有助于我判断你的输出是否被截断。请不要在 `[done]` 前后添加任何其他文字或标点。
"""
async def generate_response_for_image(
@@ -276,7 +287,16 @@ class LLMRequest:
# 模型选择和请求准备
start_time = time.time()
model_info, api_provider, client = self._select_model()
processed_prompt = self._apply_content_obfuscation(prompt, api_provider)
# 检查是否启用反截断
use_anti_truncation = getattr(self.model_for_task, "anti_truncation", False)
processed_prompt = prompt
if use_anti_truncation:
processed_prompt += self.anti_truncation_instruction
logger.info(f"任务 '{self.task_name}' 已启用反截断功能")
processed_prompt = self._apply_content_obfuscation(processed_prompt, api_provider)
message_builder = MessageBuilder()
message_builder.add_text_content(processed_prompt)
@@ -308,12 +328,22 @@ class LLMRequest:
content, extracted_reasoning = self._extract_reasoning(content)
reasoning_content = extracted_reasoning
# 检测是否为空回复
# 检测是否为空回复或截断
is_empty_reply = not content or content.strip() == ""
is_truncated = False
if is_empty_reply and empty_retry_count < max_empty_retry:
if use_anti_truncation:
if content.endswith("[done]"):
content = content[:-6].strip()
logger.debug("检测到并已移除 [done] 标记")
else:
is_truncated = True
logger.warning("未检测到 [done] 标记,判定为截断")
if (is_empty_reply or is_truncated) and empty_retry_count < max_empty_retry:
empty_retry_count += 1
logger.warning(f"检测到空回复,正在进行第 {empty_retry_count}/{max_empty_retry} 次重新生成")
reason = "空回复" if is_empty_reply else "截断"
logger.warning(f"检测到{reason},正在进行第 {empty_retry_count}/{max_empty_retry} 次重新生成")
if empty_retry_interval > 0:
await asyncio.sleep(empty_retry_interval)

View File

@@ -40,6 +40,7 @@ class ChatMode(Enum):
FOCUS = "focus" # Focus聊天模式
NORMAL = "normal" # Normal聊天模式
PROACTIVE = "proactive" # 主动思考模式
PRIORITY = "priority" # 优先级聊天模式
ALL = "all" # 所有聊天模式

View File

@@ -13,7 +13,6 @@ from src.plugin_system.base.plugin_base import PluginBase
from src.plugin_system.base.component_types import ComponentType
from src.plugin_system.utils.manifest_utils import VersionComparator
from .component_registry import component_registry
import asyncio
from src.chat.antipromptinjector.processors.command_skip_list import skip_list_manager

View File

@@ -0,0 +1,436 @@
from src.common.logger import get_logger
from bs4 import BeautifulSoup
import requests
import random
import os
import traceback
logger = get_logger("web_surfing_tool")
ABSTRACT_MAX_LENGTH = 300 # abstract max length
user_agents = [
# Edge浏览器
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
# Chrome浏览器
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
# Firefox浏览器
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:123.0) Gecko/20100101 Firefox/123.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0",
# Safari浏览器
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Safari/605.1.15",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15",
# 移动端浏览器
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (iPad; CPU OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (Linux; Android 14; SM-S918B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36",
# 搜索引擎爬虫 (模拟)
"Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)",
"Mozilla/5.0 (compatible; Bingbot/2.0; +http://www.bing.com/bingbot.htm)",
"Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)",
]
# 请求头信息
HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Cache-Control": "max-age=0",
"Connection": "keep-alive",
"Host": "www.bing.com",
"Referer": "https://www.bing.com/",
"Sec-Ch-Ua": '"Chromium";v="122", "Microsoft Edge";v="122", "Not-A.Brand";v="99"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
}
# 替代的中国区必应请求头
CN_BING_HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Cache-Control": "max-age=0",
"Connection": "keep-alive",
"Host": "cn.bing.com",
"Referer": "https://cn.bing.com/",
"Sec-Ch-Ua": '"Chromium";v="122", "Microsoft Edge";v="122", "Not-A.Brand";v="99"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
}
bing_host_url = "https://www.bing.com"
bing_search_url = "https://www.bing.com/search?q="
cn_bing_host_url = "https://cn.bing.com"
cn_bing_search_url = "https://cn.bing.com/search?q="
class BingSearch:
session = requests.Session()
session.headers = HEADERS
def search(self, keyword, num_results=10):
"""
通过关键字进行搜索
:param keyword: 关键字
:param num_results 指定返回的结果个数
:return: 结果列表
"""
if not keyword:
return None
list_result = []
page = 1
# 起始搜索的url
next_url = bing_search_url + keyword
# 循环遍历每一页的搜索结果并返回下一页的url
while len(list_result) < num_results:
data, next_url = self.parse_html(next_url, rank_start=len(list_result))
if data:
list_result += data
logger.debug(
"---searching[{}], finish parsing page {}, results number={}: ".format(keyword, page, len(data))
)
for d in data:
logger.debug(str(d))
if not next_url:
logger.debug("already search the last page。")
break
page += 1
logger.debug("\n---search [{}] finished. total results number={}".format(keyword, len(list_result)))
return list_result[:num_results] if len(list_result) > num_results else list_result
def parse_html(self, url, rank_start=0, debug=0):
"""
解析处理结果
:param url: 需要抓取的 url
:return: 结果列表下一页的url
"""
try:
logger.debug("--search_bing-------url: {}".format(url))
# 确定是国际版还是中国版必应
is_cn_bing = "cn.bing.com" in url
# 保存当前URL以便调试
query_part = url.split("?q=")[1] if "?q=" in url else "unknown_query"
debug_filename = f"debug/bing_{'cn' if is_cn_bing else 'www'}_search_{query_part[:30]}.html"
# 设置必要的Cookie
cookies = {
"SRCHHPGUSR": "SRCHLANG=zh-Hans", # 设置默认搜索语言为中文
"SRCHD": "AF=NOFORM",
"SRCHUID": "V=2&GUID=1A4D4F1C8844493F9A2E3DB0D1BC806C",
"_SS": "SID=0D89D9A3C95C60B62E7AC80CC85461B3",
"_EDGE_S": "ui=zh-cn", # 设置界面语言为中文
"_EDGE_V": "1",
}
# 使用适当的请求头
# 为每次请求随机选择不同的用户代理,降低被屏蔽风险
headers = CN_BING_HEADERS.copy() if is_cn_bing else HEADERS.copy()
headers["User-Agent"] = random.choice(user_agents)
# 为不同域名使用不同的Session避免Cookie污染
session = requests.Session()
session.headers.update(headers)
session.cookies.update(cookies)
# 添加超时和重试,降低超时时间并允许重试
try:
res = session.get(
url=url, timeout=(3.05, 6), verify=True, allow_redirects=True
) # 超时分别为连接超时和读取超时
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
# 如果第一次尝试超时,使用更宽松的设置再试一次
logger.warning(f"第一次请求超时,正在重试: {str(e)}")
try:
# 第二次尝试使用更长的超时时间
res = session.get(url=url, timeout=(5, 10), verify=False) # 忽略SSL验证
except Exception as e2:
logger.error(f"第二次请求也失败: {str(e2)}")
# 如果所有尝试都失败,返回空结果
return [], None
res.encoding = "utf-8"
# 保存响应内容以便调试
os.makedirs("debug", exist_ok=True)
with open(debug_filename, "w", encoding="utf-8") as f:
f.write(res.text)
# 检查响应状态
logger.debug(f"--search_bing-------status_code: {res.status_code}")
if res.status_code == 403:
logger.error("被禁止访问 (403 Forbidden)可能是IP被限制")
# 如果被禁止,返回空结果
return [], None
if res.status_code != 200:
logger.error(f"必应搜索请求失败,状态码: {res.status_code}")
return None, None
# 检查是否被重定向到登录页面或验证页面
if "login.live.com" in res.url or "login.microsoftonline.com" in res.url:
logger.error("被重定向到登录页面,可能需要登录")
return None, None
if "https://www.bing.com/ck/a" in res.url:
logger.error("被重定向到验证页面,可能被识别为机器人")
return None, None
# 解析HTML - 添加对多种解析器的支持
try:
# 首先尝试使用lxml解析器
root = BeautifulSoup(res.text, "lxml")
except Exception as e:
logger.warning(f"lxml解析器不可用: {str(e)}尝试使用html.parser")
try:
# 如果lxml不可用使用内置解析器
root = BeautifulSoup(res.text, "html.parser")
except Exception as e2:
logger.error(f"HTML解析失败: {str(e2)}")
return None, None
# 保存解析结果的一小部分用于调试
sample_html = str(root)[:1000] if root else ""
logger.debug(f"HTML解析结果示例: {sample_html}")
list_data = []
# 确保我们能获取到内容 - 先尝试直接提取链接
all_links = root.find_all("a")
# 记录链接总数,帮助诊断
logger.debug(f"页面中总共找到了 {len(all_links)} 个链接")
# 保存一些链接示例到日志
sample_links = []
for i, link in enumerate(all_links):
if i < 10: # 只记录前10个链接
sample_links.append({"text": link.text.strip(), "href": link.get("href", "")})
logger.debug(f"链接示例: {sample_links}")
# 方法0查找动态提取的结果
# 尝试查找包含完整结果项的父容器
result_containers = []
# 一些可能的结果容器选择器
container_selectors = [
"ol#b_results",
"div.b_searchResults",
"div#b_content",
"div.srchrslt_main",
"div.mspg_cont",
"div.ms-srchResult-results",
"div#ContentAll",
"div.resultlist",
]
for selector in container_selectors:
containers = root.select(selector)
if containers:
logger.debug(f"找到可能的结果容器: {selector}, 数量: {len(containers)}")
result_containers.extend(containers)
# 如果找到容器,尝试在容器中寻找有价值的链接
extracted_items = []
if result_containers:
for container in result_containers:
# 查找标题元素h1, h2, h3, h4
for heading in container.find_all(["h1", "h2", "h3", "h4", "strong", "b"]):
# 如果标题元素包含链接,这很可能是搜索结果的标题
link = heading.find("a")
if link and link.get("href") and link.text.strip():
url = link.get("href")
title = link.text.strip()
# 如果是有效的外部链接
if (
not url.startswith("javascript:")
and not url.startswith("#")
and not any(x in url for x in ["bing.com/search", "bing.com/images"])
):
# 查找摘要:尝试找到相邻的段落元素
abstract = ""
# 尝试在标题后面查找摘要
next_elem = heading.next_sibling
while next_elem and not abstract:
if hasattr(next_elem, "name") and next_elem.name in ["p", "div", "span"]:
abstract = next_elem.text.strip()
break
next_elem = next_elem.next_sibling
# 如果没找到,尝试在父元素内查找其他段落
if not abstract:
parent = heading.parent
for p in parent.find_all(
["p", "div"],
class_=lambda c: c
and any(
x in str(c) for x in ["desc", "abstract", "snippet", "caption", "summary"]
),
):
if p != heading:
abstract = p.text.strip()
break
# 创建结果项
extracted_items.append(
{
"title": title,
"url": url,
"abstract": abstract,
}
)
logger.debug(f"提取到搜索结果: {title}")
# 如果找到了结果,添加到列表
if extracted_items:
for rank, item in enumerate(extracted_items, start=rank_start + 1):
# 裁剪摘要长度
abstract = item["abstract"]
if ABSTRACT_MAX_LENGTH and len(abstract) > ABSTRACT_MAX_LENGTH:
abstract = abstract[:ABSTRACT_MAX_LENGTH]
list_data.append({"title": item["title"], "abstract": abstract, "url": item["url"], "rank": rank})
logger.debug(f"从容器中提取了 {len(list_data)} 个搜索结果")
if list_data:
return list_data, None
# 如果上面的方法没有找到结果,尝试通用链接提取
valid_links = []
for link in all_links:
href = link.get("href", "")
text = link.text.strip()
# 有效的搜索结果链接通常有这些特点
if (
href
and text
and len(text) > 10 # 标题通常比较长
and not href.startswith("javascript:")
and not href.startswith("#")
and not any(
x in href
for x in [
"bing.com/search",
"bing.com/images",
"bing.com/videos",
"bing.com/maps",
"bing.com/news",
"login",
"account",
"javascript",
"about.html",
"help.html",
"microsoft",
]
)
and "http" in href
): # 必须是有效URL
valid_links.append(link)
# 按文本长度排序,更长的文本更可能是搜索结果标题
valid_links.sort(key=lambda x: len(x.text.strip()), reverse=True)
if valid_links:
logger.debug(f"找到 {len(valid_links)} 个可能的搜索结果链接")
# 提取前10个作为搜索结果
for rank, link in enumerate(valid_links[:10], start=rank_start + 1):
href = link.get("href", "")
text = link.text.strip()
# 获取摘要
abstract = ""
# 尝试获取父元素的文本作为摘要
parent = link.parent
if parent and parent.text:
full_text = parent.text.strip()
if len(full_text) > len(text):
abstract = full_text.replace(text, "", 1).strip()
# 如果没有找到好的摘要,尝试查找相邻元素
if len(abstract) < 20:
next_elem = link.next_sibling
while next_elem and len(abstract) < 20:
if hasattr(next_elem, "text") and next_elem.text.strip():
abstract = next_elem.text.strip()
break
next_elem = next_elem.next_sibling
# 裁剪摘要长度
if ABSTRACT_MAX_LENGTH and len(abstract) > ABSTRACT_MAX_LENGTH:
abstract = abstract[:ABSTRACT_MAX_LENGTH]
list_data.append({"title": text, "abstract": abstract, "url": href, "rank": rank})
logger.debug(f"提取到备选搜索结果 #{rank}: {text}")
# 如果找到了结果,返回
if list_data:
logger.debug(f"通过备选方法提取了 {len(list_data)} 个搜索结果")
return list_data, None
# 检查是否有错误消息
error_msg = root.find("div", class_="b_searcherrmsg")
if error_msg:
logger.error(f"必应搜索返回错误: {error_msg.text.strip()}")
# 找到下一页按钮 (尝试多种可能的选择器)
next_url = None
# 方式1: 标准下一页按钮
pagination_classes = ["b_widePag sb_bp", "b_pag"]
for cls in pagination_classes:
next_page = root.find("a", class_=cls)
if next_page and any(txt in next_page.text for txt in ["下一页", "Next", "下页"]):
next_url = next_page.get("href", "")
if next_url and not next_url.startswith("http"):
next_url = (cn_bing_host_url if is_cn_bing else bing_host_url) + next_url
break
# 方式2: 备用下一页按钮
if not next_url:
pagination = root.find_all("a", class_="sb_pagN")
if pagination:
next_url = pagination[0].get("href", "")
if next_url and not next_url.startswith("http"):
next_url = (cn_bing_host_url if is_cn_bing else bing_host_url) + next_url
# 方式3: 通用导航元素
if not next_url:
nav_links = root.find_all("a")
for link in nav_links:
if link.text.strip() in ["下一页", "Next", "下页", "»", ">>"]:
next_url = link.get("href", "")
if next_url and not next_url.startswith("http"):
next_url = (cn_bing_host_url if is_cn_bing else bing_host_url) + next_url
break
logger.debug(f"已解析 {len(list_data)} 个结果,下一页链接: {next_url}")
return list_data, next_url
except Exception as e:
logger.error(f"解析页面时出错: {str(e)}")
logger.debug(traceback.format_exc())
return None, None

View File

@@ -6,6 +6,7 @@ from datetime import datetime, timedelta
from exa_py import Exa
from asyncddgs import aDDGS
from tavily import TavilyClient
from .bing_search import BingSearch
from src.common.logger import get_logger
from typing import Tuple,Type
@@ -39,6 +40,7 @@ class WebSurfingTool(BaseTool):
def __init__(self, plugin_config=None):
super().__init__(plugin_config)
self.bing_search = BingSearch()
# 初始化EXA API密钥轮询器
self.exa_clients = []
@@ -124,19 +126,17 @@ class WebSurfingTool(BaseTool):
search_tasks = []
for engine in enabled_engines:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
if engine == "exa" and self.exa_clients:
# 使用参数中的数量如果没有则默认5个
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
search_tasks.append(self._search_exa(custom_args))
elif engine == "tavily" and self.tavily_clients:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
search_tasks.append(self._search_tavily(custom_args))
elif engine == "ddg":
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
search_tasks.append(self._search_ddg(custom_args))
elif engine == "bing":
search_tasks.append(self._search_bing(custom_args))
if not search_tasks:
return {"error": "没有可用的搜索引擎。"}
@@ -177,6 +177,8 @@ class WebSurfingTool(BaseTool):
results = await self._search_tavily(custom_args)
elif engine == "ddg":
results = await self._search_ddg(custom_args)
elif engine == "bing":
results = await self._search_bing(custom_args)
else:
continue
@@ -206,6 +208,8 @@ class WebSurfingTool(BaseTool):
results = await self._search_tavily(custom_args)
elif engine == "ddg":
results = await self._search_ddg(custom_args)
elif engine == "bing":
results = await self._search_bing(custom_args)
else:
continue
@@ -332,6 +336,28 @@ class WebSurfingTool(BaseTool):
logger.error(f"DuckDuckGo 搜索失败: {e}")
return []
async def _search_bing(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
query = args["query"]
num_results = args.get("num_results", 3)
try:
loop = asyncio.get_running_loop()
func = functools.partial(self.bing_search.search, query, num_results=num_results)
search_response = await loop.run_in_executor(None, func)
if search_response:
return [
{
"title": r.get("title"),
"url": r.get("url"),
"snippet": r.get("abstract"),
"provider": "Bing"
}
for r in search_response
]
except Exception as e:
logger.error(f"Bing 搜索失败: {e}")
return []
def _format_results(self, results: List[Dict[str, Any]]) -> str:
if not results:
return "没有找到相关的网络信息。"
@@ -586,8 +612,10 @@ class URLParserTool(BaseTool):
}
# 保存到缓存
import os
current_file_path = os.path.abspath(__file__)
if "error" not in result:
await tool_cache.set(self.name, function_args, self.__class__, result)
await tool_cache.set(self.name, function_args, current_file_path, result)
return result

View File

@@ -12,8 +12,7 @@
from src.plugin_system.base import BaseCommand
from src.chat.antipromptinjector import get_anti_injector
from src.chat.antipromptinjector.processors.command_skip_list import (
get_skip_patterns_info,
skip_list_manager
get_skip_patterns_info
)
from src.common.logger import get_logger

View File

@@ -246,7 +246,7 @@ class QZoneService:
config_image_number = int(config_image_number)
except (ValueError, TypeError):
config_image_number = 1
logger.warning(f"配置项 image_number 值无效,使用默认值 1")
logger.warning("配置项 image_number 值无效,使用默认值 1")
max_images = min(min(config_image_number, 9), len(all_files)) # 最多9张最少1张
selected_count = max(1, max_images) # 确保至少选择1张

View File

@@ -1,676 +0,0 @@
import asyncio
import functools
import itertools
from typing import Any, Dict, List
from datetime import datetime, timedelta
from exa_py import Exa
from asyncddgs import aDDGS
from tavily import TavilyClient
from src.common.logger import get_logger
from typing import Tuple,Type
from src.plugin_system import (
BasePlugin,
register_plugin,
BaseTool,
ComponentInfo,
ConfigField,
llm_api,
ToolParamType,
PythonDependency
)
from src.plugin_system.apis import config_api # 添加config_api导入
from src.common.cache_manager import tool_cache
import httpx
from bs4 import BeautifulSoup
logger = get_logger("web_surfing_tool")
class WebSurfingTool(BaseTool):
name: str = "web_search"
description: str = "用于执行网络搜索。当用户明确要求搜索,或者需要获取关于公司、产品、事件的最新信息、新闻或动态时,必须使用此工具"
available_for_llm: bool = True
parameters = [
("query", ToolParamType.STRING, "要搜索的关键词或问题。", True, None),
("num_results", ToolParamType.INTEGER, "期望每个搜索引擎返回的搜索结果数量默认为5。", False, None),
("time_range", ToolParamType.STRING, "指定搜索的时间范围,可以是 'any', 'week', 'month'。默认为 'any'", False, ["any", "week", "month"])
] # type: ignore
def __init__(self, plugin_config=None):
super().__init__(plugin_config)
# 初始化EXA API密钥轮询器
self.exa_clients = []
self.exa_key_cycle = None
# 优先从主配置文件读取,如果没有则从插件配置文件读取
EXA_API_KEYS = config_api.get_global_config("exa.api_keys", None)
if EXA_API_KEYS is None:
# 从插件配置文件读取
EXA_API_KEYS = self.get_config("exa.api_keys", [])
if isinstance(EXA_API_KEYS, list) and EXA_API_KEYS:
valid_keys = [key.strip() for key in EXA_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")]
if valid_keys:
self.exa_clients = [Exa(api_key=key) for key in valid_keys]
self.exa_key_cycle = itertools.cycle(self.exa_clients)
logger.info(f"已配置 {len(valid_keys)} 个 Exa API 密钥")
else:
logger.warning("Exa API Keys 配置无效Exa 搜索功能将不可用。")
else:
logger.warning("Exa API Keys 未配置Exa 搜索功能将不可用。")
# 初始化Tavily API密钥轮询器
self.tavily_clients = []
self.tavily_key_cycle = None
# 优先从主配置文件读取,如果没有则从插件配置文件读取
TAVILY_API_KEYS = config_api.get_global_config("tavily.api_keys", None)
if TAVILY_API_KEYS is None:
# 从插件配置文件读取
TAVILY_API_KEYS = self.get_config("tavily.api_keys", [])
if isinstance(TAVILY_API_KEYS, list) and TAVILY_API_KEYS:
valid_keys = [key.strip() for key in TAVILY_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")]
if valid_keys:
self.tavily_clients = [TavilyClient(api_key=key) for key in valid_keys]
self.tavily_key_cycle = itertools.cycle(self.tavily_clients)
logger.info(f"已配置 {len(valid_keys)} 个 Tavily API 密钥")
else:
logger.warning("Tavily API Keys 配置无效Tavily 搜索功能将不可用。")
else:
logger.warning("Tavily API Keys 未配置Tavily 搜索功能将不可用。")
async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]:
query = function_args.get("query")
if not query:
return {"error": "搜索查询不能为空。"}
# 获取当前文件路径用于缓存键
import os
current_file_path = os.path.abspath(__file__)
# 检查缓存
query = function_args.get("query")
cached_result = await tool_cache.get(self.name, function_args, current_file_path, semantic_query=query)
if cached_result:
logger.info(f"缓存命中: {self.name} -> {function_args}")
return cached_result
# 读取搜索配置
enabled_engines = config_api.get_global_config("web_search.enabled_engines", ["ddg"])
search_strategy = config_api.get_global_config("web_search.search_strategy", "single")
logger.info(f"开始搜索,策略: {search_strategy}, 启用引擎: {enabled_engines}, 参数: '{function_args}'")
# 根据策略执行搜索
if search_strategy == "parallel":
result = await self._execute_parallel_search(function_args, enabled_engines)
elif search_strategy == "fallback":
result = await self._execute_fallback_search(function_args, enabled_engines)
else: # single
result = await self._execute_single_search(function_args, enabled_engines)
# 保存到缓存
if "error" not in result:
query = function_args.get("query")
await tool_cache.set(self.name, function_args, current_file_path, result, semantic_query=query)
return result
async def _execute_parallel_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""并行搜索策略:同时使用所有启用的搜索引擎"""
search_tasks = []
for engine in enabled_engines:
if engine == "exa" and self.exa_clients:
# 使用参数中的数量如果没有则默认5个
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
search_tasks.append(self._search_exa(custom_args))
elif engine == "tavily" and self.tavily_clients:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
search_tasks.append(self._search_tavily(custom_args))
elif engine == "ddg":
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
search_tasks.append(self._search_ddg(custom_args))
if not search_tasks:
return {"error": "没有可用的搜索引擎。"}
try:
search_results_lists = await asyncio.gather(*search_tasks, return_exceptions=True)
all_results = []
for result in search_results_lists:
if isinstance(result, list):
all_results.extend(result)
elif isinstance(result, Exception):
logger.error(f"搜索时发生错误: {result}")
# 去重并格式化
unique_results = self._deduplicate_results(all_results)
formatted_content = self._format_results(unique_results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.error(f"执行并行网络搜索时发生异常: {e}", exc_info=True)
return {"error": f"执行网络搜索时发生严重错误: {str(e)}"}
async def _execute_fallback_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""回退搜索策略:按顺序尝试搜索引擎,失败则尝试下一个"""
for engine in enabled_engines:
try:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
if engine == "exa" and self.exa_clients:
results = await self._search_exa(custom_args)
elif engine == "tavily" and self.tavily_clients:
results = await self._search_tavily(custom_args)
elif engine == "ddg":
results = await self._search_ddg(custom_args)
else:
continue
if results: # 如果有结果,直接返回
formatted_content = self._format_results(results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.warning(f"{engine} 搜索失败,尝试下一个引擎: {e}")
continue
return {"error": "所有搜索引擎都失败了。"}
async def _execute_single_search(self, function_args: Dict[str, Any], enabled_engines: List[str]) -> Dict[str, Any]:
"""单一搜索策略:只使用第一个可用的搜索引擎"""
for engine in enabled_engines:
custom_args = function_args.copy()
custom_args["num_results"] = custom_args.get("num_results", 5)
try:
if engine == "exa" and self.exa_clients:
results = await self._search_exa(custom_args)
elif engine == "tavily" and self.tavily_clients:
results = await self._search_tavily(custom_args)
elif engine == "ddg":
results = await self._search_ddg(custom_args)
else:
continue
formatted_content = self._format_results(results)
return {
"type": "web_search_result",
"content": formatted_content,
}
except Exception as e:
logger.error(f"{engine} 搜索失败: {e}")
return {"error": f"{engine} 搜索失败: {str(e)}"}
return {"error": "没有可用的搜索引擎。"}
def _deduplicate_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
unique_urls = set()
unique_results = []
for res in results:
if isinstance(res, dict) and res.get("url") and res["url"] not in unique_urls:
unique_urls.add(res["url"])
unique_results.append(res)
return unique_results
async def _search_exa(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
query = args["query"]
num_results = args.get("num_results", 3)
time_range = args.get("time_range", "any")
exa_args = {"num_results": num_results, "text": True, "highlights": True}
if time_range != "any":
today = datetime.now()
start_date = today - timedelta(days=7 if time_range == "week" else 30)
exa_args["start_published_date"] = start_date.strftime('%Y-%m-%d')
try:
if not self.exa_key_cycle:
return []
# 使用轮询机制获取下一个客户端
exa_client = next(self.exa_key_cycle)
loop = asyncio.get_running_loop()
func = functools.partial(exa_client.search_and_contents, query, **exa_args)
search_response = await loop.run_in_executor(None, func)
return [
{
"title": res.title,
"url": res.url,
"snippet": " ".join(getattr(res, 'highlights', [])) or (getattr(res, 'text', '')[:250] + '...'),
"provider": "Exa"
}
for res in search_response.results
]
except Exception as e:
logger.error(f"Exa 搜索失败: {e}")
return []
async def _search_tavily(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
query = args["query"]
num_results = args.get("num_results", 3)
time_range = args.get("time_range", "any")
try:
if not self.tavily_key_cycle:
return []
# 使用轮询机制获取下一个客户端
tavily_client = next(self.tavily_key_cycle)
# 构建Tavily搜索参数
search_params = {
"query": query,
"max_results": num_results,
"search_depth": "basic",
"include_answer": False,
"include_raw_content": False
}
# 根据时间范围调整搜索参数
if time_range == "week":
search_params["days"] = 7
elif time_range == "month":
search_params["days"] = 30
loop = asyncio.get_running_loop()
func = functools.partial(tavily_client.search, **search_params)
search_response = await loop.run_in_executor(None, func)
results = []
if search_response and "results" in search_response:
for res in search_response["results"]:
results.append({
"title": res.get("title", "无标题"),
"url": res.get("url", ""),
"snippet": res.get("content", "")[:300] + "..." if res.get("content") else "无摘要",
"provider": "Tavily"
})
return results
except Exception as e:
logger.error(f"Tavily 搜索失败: {e}")
return []
async def _search_ddg(self, args: Dict[str, Any]) -> List[Dict[str, Any]]:
query = args["query"]
num_results = args.get("num_results", 3)
try:
async with aDDGS() as ddgs:
search_response = await ddgs.text(query, max_results=num_results)
return [
{
"title": r.get("title"),
"url": r.get("href"),
"snippet": r.get("body"),
"provider": "DuckDuckGo"
}
for r in search_response
]
except Exception as e:
logger.error(f"DuckDuckGo 搜索失败: {e}")
return []
def _format_results(self, results: List[Dict[str, Any]]) -> str:
if not results:
return "没有找到相关的网络信息。"
formatted_string = "根据网络搜索结果:\n\n"
for i, res in enumerate(results, 1):
title = res.get("title", '无标题')
url = res.get("url", '#')
snippet = res.get("snippet", '无摘要')
provider = res.get("provider", "未知来源")
formatted_string += f"{i}. **{title}** (来自: {provider})\n"
formatted_string += f" - 摘要: {snippet}\n"
formatted_string += f" - 来源: {url}\n\n"
return formatted_string
class URLParserTool(BaseTool):
"""
一个用于解析和总结一个或多个网页URL内容的工具。
"""
name: str = "parse_url"
description: str = "当需要理解一个或多个特定网页链接的内容时,使用此工具。例如:'这些网页讲了什么?[https://example.com, https://example2.com]''帮我总结一下这些文章'"
available_for_llm: bool = True
parameters = [
("urls", ToolParamType.STRING, "要理解的网站", True, None),
]
def __init__(self, plugin_config=None):
super().__init__(plugin_config)
# 初始化EXA API密钥轮询器
self.exa_clients = []
self.exa_key_cycle = None
# 优先从主配置文件读取,如果没有则从插件配置文件读取
EXA_API_KEYS = config_api.get_global_config("exa.api_keys", None)
if EXA_API_KEYS is None:
# 从插件配置文件读取
EXA_API_KEYS = self.get_config("exa.api_keys", [])
if isinstance(EXA_API_KEYS, list) and EXA_API_KEYS:
valid_keys = [key.strip() for key in EXA_API_KEYS if isinstance(key, str) and key.strip() not in ("None", "")]
if valid_keys:
self.exa_clients = [Exa(api_key=key) for key in valid_keys]
self.exa_key_cycle = itertools.cycle(self.exa_clients)
logger.info(f"URL解析工具已配置 {len(valid_keys)} 个 Exa API 密钥")
else:
logger.warning("Exa API Keys 配置无效URL解析功能将受限。")
else:
logger.warning("Exa API Keys 未配置URL解析功能将受限。")
async def _local_parse_and_summarize(self, url: str) -> Dict[str, Any]:
"""
使用本地库(httpx, BeautifulSoup)解析URL并调用LLM进行总结。
"""
try:
# 读取代理配置
enable_proxy = self.get_config("proxy.enable_proxy", False)
proxies = None
if enable_proxy:
socks5_proxy = self.get_config("proxy.socks5_proxy", None)
http_proxy = self.get_config("proxy.http_proxy", None)
https_proxy = self.get_config("proxy.https_proxy", None)
# 优先使用SOCKS5代理全协议代理
if socks5_proxy:
proxies = socks5_proxy
logger.info(f"使用SOCKS5代理: {socks5_proxy}")
elif http_proxy or https_proxy:
proxies = {}
if http_proxy:
proxies["http://"] = http_proxy
if https_proxy:
proxies["https://"] = https_proxy
logger.info(f"使用HTTP/HTTPS代理配置: {proxies}")
client_kwargs = {"timeout": 15.0, "follow_redirects": True}
if proxies:
client_kwargs["proxies"] = proxies
async with httpx.AsyncClient(**client_kwargs) as client:
response = await client.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
title = soup.title.string if soup.title else "无标题"
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text(separator="\n", strip=True)
if not text:
return {"error": "无法从页面提取有效文本内容。"}
summary_prompt = f"请根据以下网页内容生成一段不超过300字的中文摘要保留核心信息和关键点:\n\n---\n\n标题: {title}\n\n内容:\n{text[:4000]}\n\n---\n\n摘要:"
text_model = str(self.get_config("models.text_model", "replyer_1"))
models = llm_api.get_available_models()
model_config = models.get(text_model)
if not model_config:
logger.error("未配置LLM模型")
return {"error": "未配置LLM模型"}
success, summary, reasoning, model_name = await llm_api.generate_with_model(
prompt=summary_prompt,
model_config=model_config,
request_type="story.generate",
temperature=0.3,
max_tokens=1000
)
if not success:
logger.info(f"生成摘要失败: {summary}")
return {"error": "发生ai错误"}
logger.info(f"成功生成摘要内容:'{summary}'")
return {
"title": title,
"url": url,
"snippet": summary,
"source": "local"
}
except httpx.HTTPStatusError as e:
logger.warning(f"本地解析URL '{url}' 失败 (HTTP {e.response.status_code})")
return {"error": f"请求失败,状态码: {e.response.status_code}"}
except Exception as e:
logger.error(f"本地解析或总结URL '{url}' 时发生未知异常: {e}", exc_info=True)
return {"error": f"发生未知错误: {str(e)}"}
async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]:
"""
执行URL内容提取和总结。优先使用Exa失败后尝试本地解析。
"""
# 获取当前文件路径用于缓存键
import os
current_file_path = os.path.abspath(__file__)
# 检查缓存
cached_result = await tool_cache.get(self.name, function_args, current_file_path)
if cached_result:
logger.info(f"缓存命中: {self.name} -> {function_args}")
return cached_result
urls_input = function_args.get("urls")
if not urls_input:
return {"error": "URL列表不能为空。"}
# 处理URL输入确保是列表格式
if isinstance(urls_input, str):
# 如果是字符串尝试解析为URL列表
import re
# 提取所有HTTP/HTTPS URL
url_pattern = r'https?://[^\s\],]+'
urls = re.findall(url_pattern, urls_input)
if not urls:
# 如果没有找到标准URL将整个字符串作为单个URL
if urls_input.strip().startswith(('http://', 'https://')):
urls = [urls_input.strip()]
else:
return {"error": "提供的字符串中未找到有效的URL。"}
elif isinstance(urls_input, list):
urls = [url.strip() for url in urls_input if isinstance(url, str) and url.strip()]
else:
return {"error": "URL格式不正确应为字符串或列表。"}
# 验证URL格式
valid_urls = []
for url in urls:
if url.startswith(('http://', 'https://')):
valid_urls.append(url)
else:
logger.warning(f"跳过无效URL: {url}")
if not valid_urls:
return {"error": "未找到有效的URL。"}
urls = valid_urls
logger.info(f"准备解析 {len(urls)} 个URL: {urls}")
successful_results = []
error_messages = []
urls_to_retry_locally = []
# 步骤 1: 尝试使用 Exa API 进行解析
contents_response = None
if self.exa_key_cycle:
logger.info(f"开始使用 Exa API 解析URL: {urls}")
try:
# 使用轮询机制获取下一个客户端
exa_client = next(self.exa_key_cycle)
loop = asyncio.get_running_loop()
exa_params = {"text": True, "summary": True, "highlights": True}
func = functools.partial(exa_client.get_contents, urls, **exa_params)
contents_response = await loop.run_in_executor(None, func)
except Exception as e:
logger.error(f"执行 Exa URL解析时发生严重异常: {e}", exc_info=True)
contents_response = None # 确保异常后为None
# 步骤 2: 处理Exa的响应
if contents_response and hasattr(contents_response, 'statuses'):
results_map = {res.url: res for res in contents_response.results} if hasattr(contents_response, 'results') else {}
if contents_response.statuses:
for status in contents_response.statuses:
if status.status == 'success':
res = results_map.get(status.id)
if res:
summary = getattr(res, 'summary', '')
highlights = " ".join(getattr(res, 'highlights', []))
text_snippet = (getattr(res, 'text', '')[:300] + '...') if getattr(res, 'text', '') else ''
snippet = summary or highlights or text_snippet or '无摘要'
successful_results.append({
"title": getattr(res, 'title', '无标题'),
"url": getattr(res, 'url', status.id),
"snippet": snippet,
"source": "exa"
})
else:
error_tag = getattr(status, 'error', '未知错误')
logger.warning(f"Exa解析URL '{status.id}' 失败: {error_tag}。准备本地重试。")
urls_to_retry_locally.append(status.id)
else:
# 如果Exa未配置、API调用失败或返回无效响应则所有URL都进入本地重试
urls_to_retry_locally.extend(url for url in urls if url not in [res['url'] for res in successful_results])
# 步骤 3: 对失败的URL进行本地解析
if urls_to_retry_locally:
logger.info(f"开始本地解析以下URL: {urls_to_retry_locally}")
local_tasks = [self._local_parse_and_summarize(url) for url in urls_to_retry_locally]
local_results = await asyncio.gather(*local_tasks)
for i, res in enumerate(local_results):
url = urls_to_retry_locally[i]
if "error" in res:
error_messages.append(f"URL: {url} - 解析失败: {res['error']}")
else:
successful_results.append(res)
if not successful_results:
return {"error": "无法从所有给定的URL获取内容。", "details": error_messages}
formatted_content = self._format_results(successful_results)
result = {
"type": "url_parse_result",
"content": formatted_content,
"errors": error_messages
}
# 保存到缓存
if "error" not in result:
await tool_cache.set(self.name, function_args, self.__class__, result)
return result
def _format_results(self, results: List[Dict[str, Any]]) -> str:
"""
将成功解析的结果列表格式化为一段简洁的文本。
"""
formatted_parts = []
for res in results:
title = res.get('title', '无标题')
url = res.get('url', '#')
snippet = res.get('snippet', '无摘要')
source = res.get('source', '未知')
formatted_string = f"**{title}**\n"
formatted_string += f"**内容摘要**:\n{snippet}\n"
formatted_string += f"**来源**: {url} (由 {source} 解析)\n"
formatted_parts.append(formatted_string)
return "\n---\n".join(formatted_parts)
@register_plugin
class WEBSEARCHPLUGIN(BasePlugin):
# 插件基本信息
plugin_name: str = "web_search_tool" # 内部标识符
enable_plugin: bool = True
dependencies: List[str] = [] # 插件依赖列表
# Python包依赖列表 - 支持两种格式:
# 方式1: 简单字符串列表(向后兼容)
# python_dependencies: List[str] = ["asyncddgs", "exa_py", "httpx[socks]"]
# 方式2: 详细的PythonDependency对象推荐
python_dependencies: List[PythonDependency] = [
PythonDependency(
package_name="asyncddgs",
description="异步DuckDuckGo搜索库",
optional=False
),
PythonDependency(
package_name="exa_py",
description="Exa搜索API客户端库",
optional=True # 如果没有API密钥这个是可选的
),
PythonDependency(
package_name="tavily",
install_name="tavily-python", # 安装时使用这个名称
description="Tavily搜索API客户端库",
optional=True # 如果没有API密钥这个是可选的
),
PythonDependency(
package_name="httpx",
version=">=0.20.0",
install_name="httpx[socks]", # 安装时使用这个名称(包含可选依赖)
description="支持SOCKS代理的HTTP客户端库",
optional=False
)
]
config_file_name: str = "config.toml" # 配置文件名
# 配置节描述
config_section_descriptions = {"plugin": "插件基本信息", "proxy": "链接本地解析代理配置"}
# 配置Schema定义
# 注意EXA配置和组件设置已迁移到主配置文件(bot_config.toml)的[exa]和[web_search]部分
config_schema: dict = {
"plugin": {
"name": ConfigField(type=str, default="WEB_SEARCH_PLUGIN", description="插件名称"),
"version": ConfigField(type=str, default="1.0.0", description="插件版本"),
"enabled": ConfigField(type=bool, default=False, description="是否启用插件"),
},
"proxy": {
"http_proxy": ConfigField(type=str, default=None, description="HTTP代理地址格式如: http://proxy.example.com:8080"),
"https_proxy": ConfigField(type=str, default=None, description="HTTPS代理地址格式如: http://proxy.example.com:8080"),
"socks5_proxy": ConfigField(type=str, default=None, description="SOCKS5代理地址格式如: socks5://proxy.example.com:1080"),
"enable_proxy": ConfigField(type=bool, default=False, description="是否启用代理")
},
}
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
enable_tool =[]
# 从主配置文件读取组件启用配置
if config_api.get_global_config("web_search.enable_web_search_tool", True):
enable_tool.append((WebSurfingTool.get_tool_info(), WebSurfingTool))
if config_api.get_global_config("web_search.enable_url_tool", True):
enable_tool.append((URLParserTool.get_tool_info(), URLParserTool))
return enable_tool

View File

@@ -1,16 +1,22 @@
# mmc/src/schedule/plan_generator.py
import json
import random
from typing import List
from pydantic import BaseModel, ValidationError
from json_repair import repair_json
from src.config.config import global_config, model_config
from src.llm_models.model_client.base_client import client_registry
from src.llm_models.payload_content.message import Message, RoleType
from src.llm_models.payload_content.resp_format import RespFormat, RespFormatType
from src.llm_models.utils_model import LLMRequest
from src.common.logger import get_logger
logger = get_logger("plan_generator")
class PlanResponse(BaseModel):
"""
用于验证月度计划LLM响应的Pydantic模型。
"""
plans: List[str]
class PlanGenerator:
"""
负责生成月度计划。
@@ -18,6 +24,8 @@ class PlanGenerator:
def __init__(self):
self.bot_personality = self._get_bot_personality()
task_config = model_config.model_task_config.get_task("monthly_plan_generator")
self.llm_request = LLMRequest(model_set=task_config, request_type="monthly_plan_generator")
def _get_bot_personality(self) -> str:
"""
@@ -61,56 +69,46 @@ class PlanGenerator:
:return: 生成的计划文本列表
"""
try:
# 1. 获取模型任务配置
task_config = model_config.model_task_config.get_task("monthly_plan_generator")
# 2. 随机选择一个模型
model_name = random.choice(task_config.model_list)
model_info = model_config.get_model_info(model_name)
api_provider = model_config.get_provider(model_info.api_provider)
# 3. 获取客户端实例
llm_client = client_registry.get_client_class_instance(api_provider)
# 4. 构建Prompt和消息体
# 1. 构建Prompt
prompt = self._build_prompt(year, month, count)
message_list = [Message(role=RoleType.User, content=prompt)]
logger.info(f"正在为 {year}-{month} 生成 {count} 个月度计划...")
# 2. 调用LLM
llm_content, (reasoning, model_name, _) = await self.llm_request.generate_response_async(prompt=prompt)
logger.info(f"正在使用模型 '{model_name}' {year}-{month} 生成 {count} 个月度计划...")
# 5. 调用LLM
response = await llm_client.get_response(
model_info=model_info,
message_list=message_list,
temperature=task_config.temperature,
max_tokens=task_config.max_tokens,
response_format=RespFormat(format_type=RespFormatType.JSON_OBJ) # 请求JSON输出
)
if not response or not response.content:
logger.info(f"使用模型 '{model_name}' 生成完成。")
if reasoning:
logger.debug(f"模型推理过程: {reasoning}")
if not llm_content:
logger.error("LLM未能返回有效的计划内容。")
return []
# 6. 解析LLM返回的JSON
# 3. 解析并验证LLM返回的JSON
try:
# 移除可能的Markdown代码块标记
clean_content = response.content.strip()
clean_content = llm_content.strip()
if clean_content.startswith("```json"):
clean_content = clean_content[7:]
if clean_content.endswith("```"):
clean_content = clean_content[:-3]
data = json.loads(clean_content.strip())
plans = data.get("plans", [])
if isinstance(plans, list) and all(isinstance(p, str) for p in plans):
logger.info(f"成功生成并解析了 {len(plans)} 个月度计划。")
return plans
else:
logger.error(f"LLM返回的JSON格式不正确或'plans'键不是字符串列表: {response.content}")
return []
# 修复并解析JSON
repaired_json_str = repair_json(clean_content)
data = json.loads(repaired_json_str)
# 使用Pydantic进行验证
validated_response = PlanResponse.model_validate(data)
plans = validated_response.plans
logger.info(f"成功生成并验证了 {len(plans)} 个月度计划。")
return plans
except json.JSONDecodeError:
logger.error(f"无法解析LLM返回的JSON: {response.content}")
logger.error(f"修复后仍然无法解析LLM返回的JSON: {llm_content}")
return []
except ValidationError as e:
logger.error(f"LLM返回的JSON格式不符合预期: {e}\n原始响应: {llm_content}")
return []
except Exception as e:

View File

@@ -1,5 +1,5 @@
[inner]
version = "6.4.0"
version = "6.4.1"
#----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
#如果你想要修改配置文件请递增version的值
@@ -401,7 +401,7 @@ enable_web_search_tool = true # 是否启用联网搜索tool
enable_url_tool = true # 是否启用URL解析tool
# 搜索引擎配置
enabled_engines = ["ddg"] # 启用的搜索引擎列表,可选: "exa", "tavily", "ddg"
enabled_engines = ["ddg"] # 启用的搜索引擎列表,可选: "exa", "tavily", "ddg","bing"
search_strategy = "single" # 搜索策略: "single"(使用第一个可用引擎), "parallel"(并行使用所有启用的引擎), "fallback"(按顺序尝试,失败则尝试下一个)
[plugins] # 插件配置

View File

@@ -125,6 +125,7 @@ model_list = ["siliconflow-deepseek-v3"] # 使用的模型列表,每个子项
temperature = 0.2 # 模型温度新V3建议0.1-0.3
max_tokens = 800 # 最大输出token数
#concurrency_count = 2 # 并发请求数量默认为1不并发设置为2或更高启用并发
#anti_truncation = true # 启用反截断功能,防止模型输出被截断
[model_task_config.utils_small] # 在麦麦的一些组件中使用的小模型,消耗量较大,建议使用速度较快的小模型
model_list = ["qwen3-8b"]

View File

@@ -1,7 +1,6 @@
import pytest
import asyncio
import time
from unittest.mock import Mock, patch, MagicMock
from unittest.mock import Mock, patch
import sys
import os