【迁移】工具系统再完善:工具缓存、ttl支持、自动记录、长期保存、自动清理缓存、将记录与执行分离、api记录查询、时间聊天工具筛选查询...

This commit is contained in:
Windpicker-owo
2025-08-27 11:04:20 +08:00
parent 434c174dcc
commit 80ab0c687c
9 changed files with 604 additions and 168 deletions

4
bot.py
View File

@@ -31,7 +31,7 @@ from src.manager.async_task_manager import async_task_manager # noqa
from src.config.config import global_config # noqa from src.config.config import global_config # noqa
from src.common.database.database import initialize_sql_database # noqa from src.common.database.database import initialize_sql_database # noqa
from src.common.database.sqlalchemy_models import initialize_database as init_db # noqa from src.common.database.sqlalchemy_models import initialize_database as init_db # noqa
from src.common.tool_history import wrap_tool_executor #noqa
logger = get_logger("main") logger = get_logger("main")
@@ -240,6 +240,8 @@ class MaiBotMain(BaseMain):
self.setup_timezone() self.setup_timezone()
self.check_and_confirm_eula() self.check_and_confirm_eula()
self.initialize_database() self.initialize_database()
# 初始化工具历史记录
wrap_tool_executor()
return self.create_main_system() return self.create_main_system()

View File

@@ -235,7 +235,7 @@ class DefaultReplyer:
from src.plugin_system.core.tool_use import ToolExecutor # 延迟导入ToolExecutor不然会循环依赖 from src.plugin_system.core.tool_use import ToolExecutor # 延迟导入ToolExecutor不然会循环依赖
self.tool_executor = ToolExecutor(chat_id=self.chat_stream.stream_id, enable_cache=False) self.tool_executor = ToolExecutor(chat_id=self.chat_stream.stream_id)
async def _build_cross_context_block(self, current_chat_id: str, target_user_info: Optional[Dict[str, Any]]) -> str: async def _build_cross_context_block(self, current_chat_id: str, target_user_info: Optional[Dict[str, Any]]) -> str:
"""构建跨群聊上下文""" """构建跨群聊上下文"""

View File

@@ -7,11 +7,32 @@ from contextlib import asynccontextmanager
from typing import Dict, Any, Optional, List, Union from typing import Dict, Any, Optional, List, Union
from src.common.logger import get_logger from src.common.logger import get_logger
from src.common.tool_history import ToolHistoryManager
install(extra_lines=3) install(extra_lines=3)
logger = get_logger("prompt_build") logger = get_logger("prompt_build")
# 创建工具历史管理器实例
tool_history_manager = ToolHistoryManager()
def get_tool_history_prompt(message_id: Optional[str] = None) -> str:
"""获取工具历史提示词
Args:
message_id: 会话ID, 用于只获取当前会话的历史
Returns:
格式化的工具历史提示词
"""
from src.config.config import global_config
if not global_config.tool.history.enable_prompt_history:
return ""
return tool_history_manager.get_recent_history_prompt(
chat_id=message_id
)
class PromptContext: class PromptContext:
def __init__(self): def __init__(self):
@@ -136,8 +157,37 @@ class PromptManager:
return prompt return prompt
async def format_prompt(self, name: str, **kwargs) -> str: async def format_prompt(self, name: str, **kwargs) -> str:
# 获取当前提示词
prompt = await self.get_prompt_async(name) prompt = await self.get_prompt_async(name)
return prompt.format(**kwargs) # 获取当前会话ID
message_id = self._context._current_context
# 获取工具历史提示词
tool_history = ""
if name in ['action_prompt', 'replyer_prompt', 'planner_prompt', 'tool_executor_prompt']:
tool_history = get_tool_history_prompt(message_id)
# 获取基本格式化结果
result = prompt.format(**kwargs)
# 如果有工具历史,插入到适当位置
if tool_history:
# 查找合适的插入点
# 在人格信息和身份块之后,但在主要内容之前
identity_end = result.find("```\n现在,你说:")
if identity_end == -1:
# 如果找不到特定标记,尝试在第一个段落后插入
first_double_newline = result.find("\n\n")
if first_double_newline != -1:
# 在第一个双换行后插入
result = f"{result[:first_double_newline + 2]}{tool_history}\n{result[first_double_newline + 2:]}"
else:
# 如果找不到合适的位置,添加到开头
result = f"{tool_history}\n\n{result}"
else:
# 在找到的位置插入
result = f"{result[:identity_end]}\n{tool_history}\n{result[identity_end:]}"
return result
# 全局单例 # 全局单例

385
src/common/tool_history.py Normal file
View File

@@ -0,0 +1,385 @@
"""工具执行历史记录模块"""
import functools
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Union
import json
from pathlib import Path
import asyncio
from .logger import get_logger
from src.config.config import global_config
logger = get_logger("tool_history")
class ToolHistoryManager:
"""工具执行历史记录管理器"""
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not self._initialized:
self._history: List[Dict[str, Any]] = []
self._initialized = True
self._data_dir = Path("data/tool_history")
self._data_dir.mkdir(parents=True, exist_ok=True)
self._history_file = self._data_dir / "tool_history.jsonl"
self._load_history()
def _save_history(self):
"""保存所有历史记录到文件"""
try:
with self._history_file.open("w", encoding="utf-8") as f:
for record in self._history:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
except Exception as e:
logger.error(f"保存工具调用记录失败: {e}")
def _save_record(self, record: Dict[str, Any]):
"""保存单条记录到文件"""
try:
with self._history_file.open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
except Exception as e:
logger.error(f"保存工具调用记录失败: {e}")
def _clean_expired_records(self):
"""清理已过期的记录"""
original_count = len(self._history)
self._history = [record for record in self._history if record.get("ttl_count", 0) < record.get("ttl", 5)]
cleaned_count = original_count - len(self._history)
if cleaned_count > 0:
logger.info(f"清理了 {cleaned_count} 条过期的工具历史记录,剩余 {len(self._history)}")
self._save_history()
else:
logger.debug("没有需要清理的过期工具历史记录")
def record_tool_call(self,
tool_name: str,
args: Dict[str, Any],
result: Any,
execution_time: float,
status: str,
chat_id: Optional[str] = None,
ttl: int = 5):
"""记录工具调用
Args:
tool_name: 工具名称
args: 工具调用参数
result: 工具返回结果
execution_time: 执行时间(秒)
status: 执行状态("completed""error")
chat_id: 聊天ID与ChatManager中的chat_id对应用于标识群聊或私聊会话
ttl: 该记录的生命周期值插入提示词多少次后删除默认为5
"""
# 检查是否启用历史记录且ttl大于0
if not global_config.tool.history.enable_history or ttl <= 0:
return
# 先清理过期记录
self._clean_expired_records()
try:
# 创建记录
record = {
"tool_name": tool_name,
"timestamp": datetime.now().isoformat(),
"arguments": self._sanitize_args(args),
"result": self._sanitize_result(result),
"execution_time": execution_time,
"status": status,
"chat_id": chat_id,
"ttl": ttl,
"ttl_count": 0
}
# 添加到内存中的历史记录
self._history.append(record)
# 保存到文件
self._save_record(record)
if status == "completed":
logger.info(f"工具 {tool_name} 调用完成,耗时:{execution_time:.2f}s")
else:
logger.error(f"工具 {tool_name} 调用失败:{result}")
except Exception as e:
logger.error(f"记录工具调用时发生错误: {e}")
def find_cached_result(self, tool_name: str, args: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""查找匹配的缓存记录
Args:
tool_name: 工具名称
args: 工具调用参数
Returns:
Optional[Dict[str, Any]]: 如果找到匹配的缓存记录则返回结果否则返回None
"""
# 检查是否启用历史记录
if not global_config.tool.history.enable_history:
return None
# 清理输入参数中的敏感信息以便比较
sanitized_input_args = self._sanitize_args(args)
# 按时间倒序遍历历史记录
for record in reversed(self._history):
if (record["tool_name"] == tool_name and
record["status"] == "completed" and
record["ttl_count"] < record.get("ttl", 5)):
# 比较参数是否匹配
if self._sanitize_args(record["arguments"]) == sanitized_input_args:
logger.info(f"工具 {tool_name} 命中缓存记录")
return record["result"]
return None
def _sanitize_args(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""清理参数中的敏感信息"""
sensitive_keys = ['api_key', 'token', 'password', 'secret']
sanitized = args.copy()
def _sanitize_value(value):
if isinstance(value, dict):
return {k: '***' if k.lower() in sensitive_keys else _sanitize_value(v)
for k, v in value.items()}
return value
return {k: '***' if k.lower() in sensitive_keys else _sanitize_value(v)
for k, v in sanitized.items()}
def _sanitize_result(self, result: Any) -> Any:
"""清理结果中的敏感信息"""
if isinstance(result, dict):
return self._sanitize_args(result)
return result
def _load_history(self):
"""加载历史记录文件"""
try:
if self._history_file.exists():
self._history = []
with self._history_file.open("r", encoding="utf-8") as f:
for line in f:
try:
record = json.loads(line)
if record.get("ttl_count", 0) < record.get("ttl", 5): # 只加载未过期的记录
self._history.append(record)
except json.JSONDecodeError:
continue
logger.info(f"成功加载了 {len(self._history)} 条历史记录")
except Exception as e:
logger.error(f"加载历史记录失败: {e}")
def query_history(self,
tool_names: Optional[List[str]] = None,
start_time: Optional[Union[datetime, str]] = None,
end_time: Optional[Union[datetime, str]] = None,
chat_id: Optional[str] = None,
limit: Optional[int] = None,
status: Optional[str] = None) -> List[Dict[str, Any]]:
"""查询工具调用历史
Args:
tool_names: 工具名称列表,为空则查询所有工具
start_time: 开始时间可以是datetime对象或ISO格式字符串
end_time: 结束时间可以是datetime对象或ISO格式字符串
chat_id: 聊天ID与ChatManager中的chat_id对应用于查询特定群聊或私聊的历史记录
limit: 返回记录数量限制
status: 执行状态筛选("completed""error")
Returns:
符合条件的历史记录列表
"""
# 先清理过期记录
self._clean_expired_records()
def _parse_time(time_str: Optional[Union[datetime, str]]) -> Optional[datetime]:
if isinstance(time_str, datetime):
return time_str
elif isinstance(time_str, str):
return datetime.fromisoformat(time_str)
return None
filtered_history = self._history
# 按工具名筛选
if tool_names:
filtered_history = [
record for record in filtered_history
if record["tool_name"] in tool_names
]
# 按时间范围筛选
start_dt = _parse_time(start_time)
end_dt = _parse_time(end_time)
if start_dt:
filtered_history = [
record for record in filtered_history
if datetime.fromisoformat(record["timestamp"]) >= start_dt
]
if end_dt:
filtered_history = [
record for record in filtered_history
if datetime.fromisoformat(record["timestamp"]) <= end_dt
]
# 按聊天ID筛选
if chat_id:
filtered_history = [
record for record in filtered_history
if record.get("chat_id") == chat_id
]
# 按状态筛选
if status:
filtered_history = [
record for record in filtered_history
if record["status"] == status
]
# 应用数量限制
if limit:
filtered_history = filtered_history[-limit:]
return filtered_history
def get_recent_history_prompt(self,
limit: Optional[int] = None,
chat_id: Optional[str] = None) -> str:
"""
获取最近工具调用历史的提示词
Args:
limit: 返回的历史记录数量,如果不提供则使用配置中的max_history
chat_id: 会话ID用于只获取当前会话的历史
Returns:
格式化的历史记录提示词
"""
# 检查是否启用历史记录
if not global_config.tool.history.enable_history:
return ""
# 使用配置中的最大历史记录数
if limit is None:
limit = global_config.tool.history.max_history
recent_history = self.query_history(
chat_id=chat_id,
limit=limit
)
if not recent_history:
return ""
prompt = "\n工具执行历史:\n"
needs_save = False
updated_history = []
for record in recent_history:
# 增加ttl计数
record["ttl_count"] = record.get("ttl_count", 0) + 1
needs_save = True
# 如果未超过ttl则添加到提示词中
if record["ttl_count"] < record.get("ttl", 5):
# 提取结果中的name和content
result = record['result']
if isinstance(result, dict):
name = result.get('name', record['tool_name'])
content = result.get('content', str(result))
else:
name = record['tool_name']
content = str(result)
# 格式化内容,去除多余空白和换行
content = content.strip().replace('\n', ' ')
# 如果内容太长则截断
if len(content) > 200:
content = content[:200] + "..."
prompt += f"{name}: \n{content}\n\n"
updated_history.append(record)
# 更新历史记录并保存
if needs_save:
self._history = updated_history
self._save_history()
return prompt
def clear_history(self):
"""清除历史记录"""
self._history.clear()
self._save_history()
logger.info("工具调用历史记录已清除")
def wrap_tool_executor():
"""
包装工具执行器以添加历史记录功能
这个函数应该在系统启动时被调用一次
"""
from src.plugin_system.core.tool_use import ToolExecutor
original_execute = ToolExecutor.execute_tool_call
history_manager = ToolHistoryManager()
async def wrapped_execute_tool_call(self, tool_call, tool_instance=None):
start_time = time.time()
# 首先检查缓存
if cached_result := history_manager.find_cached_result(tool_call.func_name, tool_call.args):
logger.info(f"{self.log_prefix}使用缓存结果,跳过工具 {tool_call.func_name} 执行")
return cached_result
try:
result = await original_execute(self, tool_call, tool_instance)
execution_time = time.time() - start_time
# 获取工具的ttl值
ttl = getattr(tool_instance, 'history_ttl', 5) if tool_instance else 5
# 记录成功的调用
history_manager.record_tool_call(
tool_name=tool_call.func_name,
args=tool_call.args,
result=result,
execution_time=execution_time,
status="completed",
chat_id=getattr(self, 'chat_id', None),
ttl=ttl
)
return result
except Exception as e:
execution_time = time.time() - start_time
# 获取工具的ttl值
ttl = getattr(tool_instance, 'history_ttl', 5) if tool_instance else 5
# 记录失败的调用
history_manager.record_tool_call(
tool_name=tool_call.func_name,
args=tool_call.args,
result=str(e),
execution_time=execution_time,
status="error",
chat_id=getattr(self, 'chat_id', None),
ttl=ttl
)
raise
# 替换原始方法
ToolExecutor.execute_tool_call = wrapped_execute_tool_call

View File

@@ -396,13 +396,29 @@ class ExpressionConfig(ValidatedConfigBase):
return None return None
class ToolHistoryConfig(ValidatedConfigBase):
"""工具历史记录配置类"""
enable_history: bool = True
"""是否启用工具历史记录"""
enable_prompt_history: bool = True
"""是否在提示词中加入工具历史记录"""
max_history: int = 5
"""注入到提示词中的最大工具历史记录数量"""
data_dir: str = "data/tool_history"
"""历史记录保存目录"""
class ToolConfig(ValidatedConfigBase): class ToolConfig(ValidatedConfigBase):
"""工具配置类""" """工具配置类"""
enable_tool: bool = Field(default=False, description="启用工具") enable_tool: bool = Field(default=False, description="启用工具")
history: ToolHistoryConfig = Field(default_factory=ToolHistoryConfig)
"""工具历史记录配置"""
class VoiceConfig(ValidatedConfigBase): class VoiceConfig(ValidatedConfigBase):
"""语音识别配置类""" """语音识别配置类"""

View File

@@ -1,7 +1,9 @@
from typing import Optional, Type from typing import Any, Dict, List, Optional, Type, Union
from datetime import datetime
from src.plugin_system.base.base_tool import BaseTool from src.plugin_system.base.base_tool import BaseTool
from src.plugin_system.base.component_types import ComponentType from src.plugin_system.base.component_types import ComponentType
from src.common.tool_history import ToolHistoryManager
from src.common.logger import get_logger from src.common.logger import get_logger
logger = get_logger("tool_api") logger = get_logger("tool_api")
@@ -32,3 +34,109 @@ def get_llm_available_tool_definitions():
llm_available_tools = component_registry.get_llm_available_tools() llm_available_tools = component_registry.get_llm_available_tools()
return [(name, tool_class.get_tool_definition()) for name, tool_class in llm_available_tools.items()] return [(name, tool_class.get_tool_definition()) for name, tool_class in llm_available_tools.items()]
def get_tool_history(
tool_names: Optional[List[str]] = None,
start_time: Optional[Union[datetime, str]] = None,
end_time: Optional[Union[datetime, str]] = None,
chat_id: Optional[str] = None,
limit: Optional[int] = None,
status: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
获取工具调用历史记录
Args:
tool_names: 工具名称列表,为空则查询所有工具
start_time: 开始时间可以是datetime对象或ISO格式字符串
end_time: 结束时间可以是datetime对象或ISO格式字符串
chat_id: 会话ID用于筛选特定会话的调用
limit: 返回记录数量限制
status: 执行状态筛选("completed""error")
Returns:
List[Dict]: 工具调用记录列表,每条记录包含以下字段:
- tool_name: 工具名称
- timestamp: 调用时间
- arguments: 调用参数
- result: 调用结果
- execution_time: 执行时间
- status: 执行状态
- chat_id: 会话ID
"""
history_manager = ToolHistoryManager()
return history_manager.query_history(
tool_names=tool_names,
start_time=start_time,
end_time=end_time,
chat_id=chat_id,
limit=limit,
status=status
)
def get_tool_history_text(
tool_names: Optional[List[str]] = None,
start_time: Optional[Union[datetime, str]] = None,
end_time: Optional[Union[datetime, str]] = None,
chat_id: Optional[str] = None,
limit: Optional[int] = None,
status: Optional[str] = None
) -> str:
"""
获取工具调用历史记录的文本格式
Args:
tool_names: 工具名称列表,为空则查询所有工具
start_time: 开始时间可以是datetime对象或ISO格式字符串
end_time: 结束时间可以是datetime对象或ISO格式字符串
chat_id: 会话ID用于筛选特定会话的调用
limit: 返回记录数量限制
status: 执行状态筛选("completed""error")
Returns:
str: 格式化的工具调用历史记录文本
"""
history = get_tool_history(
tool_names=tool_names,
start_time=start_time,
end_time=end_time,
chat_id=chat_id,
limit=limit,
status=status
)
if not history:
return "没有找到工具调用记录"
text = "工具调用历史记录:\n"
for record in history:
# 提取结果中的name和content
result = record['result']
if isinstance(result, dict):
name = result.get('name', record['tool_name'])
content = result.get('content', str(result))
else:
name = record['tool_name']
content = str(result)
# 格式化内容
content = content.strip().replace('\n', ' ')
if len(content) > 200:
content = content[:200] + "..."
# 格式化时间
timestamp = datetime.fromisoformat(record['timestamp']).strftime("%Y-%m-%d %H:%M:%S")
text += f"[{timestamp}] {name}\n"
text += f"结果: {content}\n\n"
return text
def clear_tool_history() -> None:
"""
清除所有工具调用历史记录
"""
history_manager = ToolHistoryManager()
history_manager.clear_history()

View File

@@ -28,6 +28,8 @@ class BaseTool(ABC):
""" """
available_for_llm: bool = False available_for_llm: bool = False
"""是否可供LLM使用""" """是否可供LLM使用"""
history_ttl: int = 5
"""工具调用历史记录的TTL值默认为5。设为0表示不记录历史"""
def __init__(self, plugin_config: Optional[dict] = None): def __init__(self, plugin_config: Optional[dict] = None):
self.plugin_config = plugin_config or {} # 直接存储插件配置字典 self.plugin_config = plugin_config or {} # 直接存储插件配置字典

View File

@@ -40,13 +40,12 @@ class ToolExecutor:
可以直接输入聊天消息内容,自动判断并执行相应的工具,返回结构化的工具执行结果。 可以直接输入聊天消息内容,自动判断并执行相应的工具,返回结构化的工具执行结果。
""" """
def __init__(self, chat_id: str, enable_cache: bool = False, cache_ttl: int = 3): def __init__(self, chat_id: str):
"""初始化工具执行器 """初始化工具执行器
Args: Args:
executor_id: 执行器标识符,用于日志记录 executor_id: 执行器标识符,用于日志记录
enable_cache: 是否启用缓存机制 chat_id: 聊天标识符,用于日志记录
cache_ttl: 缓存生存时间(周期数)
""" """
self.chat_id = chat_id self.chat_id = chat_id
self.chat_stream = get_chat_manager().get_stream(self.chat_id) self.chat_stream = get_chat_manager().get_stream(self.chat_id)
@@ -54,12 +53,7 @@ class ToolExecutor:
self.llm_model = LLMRequest(model_set=model_config.model_task_config.tool_use, request_type="tool_executor") self.llm_model = LLMRequest(model_set=model_config.model_task_config.tool_use, request_type="tool_executor")
# 缓存配置 logger.info(f"{self.log_prefix}工具执行器初始化完成")
self.enable_cache = enable_cache
self.cache_ttl = cache_ttl
self.tool_cache = {} # 格式: {cache_key: {"result": result, "ttl": ttl, "timestamp": timestamp}}
logger.info(f"{self.log_prefix}工具执行器初始化完成,缓存{'启用' if enable_cache else '禁用'}TTL={cache_ttl}")
async def execute_from_chat_message( async def execute_from_chat_message(
self, target_message: str, chat_history: str, sender: str, return_details: bool = False self, target_message: str, chat_history: str, sender: str, return_details: bool = False
@@ -77,18 +71,6 @@ class ToolExecutor:
如果return_details为True: Tuple[List[Dict], List[str], str] - (结果列表, 使用的工具, 提示词) 如果return_details为True: Tuple[List[Dict], List[str], str] - (结果列表, 使用的工具, 提示词)
""" """
# 首先检查缓存
cache_key = self._generate_cache_key(target_message, chat_history, sender)
if cached_result := self._get_from_cache(cache_key):
logger.info(f"{self.log_prefix}使用缓存结果,跳过工具执行")
if not return_details:
return cached_result, [], ""
# 从缓存结果中提取工具名称
used_tools = [result.get("tool_name", "unknown") for result in cached_result]
return cached_result, used_tools, ""
# 缓存未命中,执行工具调用
# 获取可用工具 # 获取可用工具
tools = self._get_tool_definitions() tools = self._get_tool_definitions()
@@ -117,10 +99,6 @@ class ToolExecutor:
# 执行工具调用 # 执行工具调用
tool_results, used_tools = await self.execute_tool_calls(tool_calls) tool_results, used_tools = await self.execute_tool_calls(tool_calls)
# 缓存结果
if tool_results:
self._set_cache(cache_key, tool_results)
if used_tools: if used_tools:
logger.info(f"{self.log_prefix}工具执行完成,共执行{len(used_tools)}个工具: {used_tools}") logger.info(f"{self.log_prefix}工具执行完成,共执行{len(used_tools)}个工具: {used_tools}")
@@ -151,9 +129,19 @@ class ToolExecutor:
return [], [] return [], []
# 提取tool_calls中的函数名称 # 提取tool_calls中的函数名称
func_names = [call.func_name for call in tool_calls if call.func_name] func_names = []
for call in tool_calls:
try:
if hasattr(call, 'func_name'):
func_names.append(call.func_name)
except Exception as e:
logger.error(f"{self.log_prefix}获取工具名称失败: {e}")
continue
if func_names:
logger.info(f"{self.log_prefix}开始执行工具调用: {func_names}") logger.info(f"{self.log_prefix}开始执行工具调用: {func_names}")
else:
logger.warning(f"{self.log_prefix}未找到有效的工具调用")
# 执行每个工具调用 # 执行每个工具调用
for tool_call in tool_calls: for tool_call in tool_calls:
@@ -216,88 +204,24 @@ class ToolExecutor:
logger.warning(f"未知工具名称: {function_name}") logger.warning(f"未知工具名称: {function_name}")
return None return None
# 执行工具 # 执行工具并记录日志
logger.debug(f"{self.log_prefix}执行工具 {function_name},参数: {function_args}")
result = await tool_instance.execute(function_args) result = await tool_instance.execute(function_args)
if result: if result:
logger.debug(f"{self.log_prefix}工具 {function_name} 执行成功,结果: {result}")
return { return {
"tool_call_id": tool_call.call_id, "tool_call_id": tool_call.call_id,
"role": "tool", "role": "tool",
"name": function_name, "name": function_name,
"type": "function", "type": "function",
"content": result["content"], "content": result.get("content", "")
} }
logger.warning(f"{self.log_prefix}工具 {function_name} 返回空结果")
return None return None
except Exception as e: except Exception as e:
logger.error(f"执行工具调用时发生错误: {str(e)}") logger.error(f"执行工具调用时发生错误: {str(e)}")
raise e raise e
def _generate_cache_key(self, target_message: str, chat_history: str, sender: str) -> str:
"""生成缓存键
Args:
target_message: 目标消息内容
chat_history: 聊天历史
sender: 发送者
Returns:
str: 缓存键
"""
import hashlib
# 使用消息内容和群聊状态生成唯一缓存键
content = f"{target_message}_{chat_history}_{sender}"
return hashlib.md5(content.encode()).hexdigest()
def _get_from_cache(self, cache_key: str) -> Optional[List[Dict]]:
"""从缓存获取结果
Args:
cache_key: 缓存键
Returns:
Optional[List[Dict]]: 缓存的结果如果不存在或过期则返回None
"""
if not self.enable_cache or cache_key not in self.tool_cache:
return None
cache_item = self.tool_cache[cache_key]
if cache_item["ttl"] <= 0:
# 缓存过期,删除
del self.tool_cache[cache_key]
logger.debug(f"{self.log_prefix}缓存过期,删除缓存键: {cache_key}")
return None
# 减少TTL
cache_item["ttl"] -= 1
logger.debug(f"{self.log_prefix}使用缓存结果剩余TTL: {cache_item['ttl']}")
return cache_item["result"]
def _set_cache(self, cache_key: str, result: List[Dict]):
"""设置缓存
Args:
cache_key: 缓存键
result: 要缓存的结果
"""
if not self.enable_cache:
return
self.tool_cache[cache_key] = {"result": result, "ttl": self.cache_ttl, "timestamp": time.time()}
logger.debug(f"{self.log_prefix}设置缓存TTL: {self.cache_ttl}")
def _cleanup_expired_cache(self):
"""清理过期的缓存"""
if not self.enable_cache:
return
expired_keys = []
expired_keys.extend(cache_key for cache_key, cache_item in self.tool_cache.items() if cache_item["ttl"] <= 0)
for key in expired_keys:
del self.tool_cache[key]
if expired_keys:
logger.debug(f"{self.log_prefix}清理了{len(expired_keys)}个过期缓存")
async def execute_specific_tool_simple(self, tool_name: str, tool_args: Dict) -> Optional[Dict]: async def execute_specific_tool_simple(self, tool_name: str, tool_args: Dict) -> Optional[Dict]:
"""直接执行指定工具 """直接执行指定工具
@@ -336,86 +260,30 @@ class ToolExecutor:
return None return None
def clear_cache(self):
"""清空所有缓存"""
if self.enable_cache:
cache_count = len(self.tool_cache)
self.tool_cache.clear()
logger.info(f"{self.log_prefix}清空了{cache_count}个缓存项")
def get_cache_status(self) -> Dict:
"""获取缓存状态信息
Returns:
Dict: 包含缓存统计信息的字典
"""
if not self.enable_cache:
return {"enabled": False, "cache_count": 0}
# 清理过期缓存
self._cleanup_expired_cache()
total_count = len(self.tool_cache)
ttl_distribution = {}
for cache_item in self.tool_cache.values():
ttl = cache_item["ttl"]
ttl_distribution[ttl] = ttl_distribution.get(ttl, 0) + 1
return {
"enabled": True,
"cache_count": total_count,
"cache_ttl": self.cache_ttl,
"ttl_distribution": ttl_distribution,
}
def set_cache_config(self, enable_cache: Optional[bool] = None, cache_ttl: int = -1):
"""动态修改缓存配置
Args:
enable_cache: 是否启用缓存
cache_ttl: 缓存TTL
"""
if enable_cache is not None:
self.enable_cache = enable_cache
logger.info(f"{self.log_prefix}缓存状态修改为: {'启用' if enable_cache else '禁用'}")
if cache_ttl > 0:
self.cache_ttl = cache_ttl
logger.info(f"{self.log_prefix}缓存TTL修改为: {cache_ttl}")
""" """
ToolExecutor使用示例 ToolExecutor使用示例
# 1. 基础使用 - 从聊天消息执行工具启用缓存默认TTL=3 # 1. 基础使用 - 从聊天消息执行工具
executor = ToolExecutor(executor_id="my_executor") executor = ToolExecutor(chat_id=my_chat_id)
results, _, _ = await executor.execute_from_chat_message( results, _, _ = await executor.execute_from_chat_message(
talking_message_str="今天天气怎么样?现在几点了?", target_message="今天天气怎么样?现在几点了?",
is_group_chat=False chat_history="",
sender="用户"
) )
# 2. 禁用缓存的执行器 # 2. 获取详细信息
no_cache_executor = ToolExecutor(executor_id="no_cache", enable_cache=False)
# 3. 自定义缓存TTL
long_cache_executor = ToolExecutor(executor_id="long_cache", cache_ttl=10)
# 4. 获取详细信息
results, used_tools, prompt = await executor.execute_from_chat_message( results, used_tools, prompt = await executor.execute_from_chat_message(
talking_message_str="帮我查询Python相关知识", target_message="帮我查询Python相关知识",
is_group_chat=False, chat_history="",
sender="用户",
return_details=True return_details=True
) )
# 5. 直接执行特定工具 # 3. 直接执行特定工具
result = await executor.execute_specific_tool_simple( result = await executor.execute_specific_tool_simple(
tool_name="get_knowledge", tool_name="get_knowledge",
tool_args={"query": "机器学习"} tool_args={"query": "机器学习"}
) )
# 6. 缓存管理
cache_status = executor.get_cache_status() # 查看缓存状态
executor.clear_cache() # 清空缓存
executor.set_cache_config(cache_ttl=5) # 动态修改缓存配置
""" """

View File

@@ -1,5 +1,5 @@
[inner] [inner]
version = "6.5.2" version = "6.5.3"
#----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读---- #----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
#如果你想要修改配置文件请递增version的值 #如果你想要修改配置文件请递增version的值
@@ -213,6 +213,11 @@ willing_mode = "classical" # 回复意愿模式 —— 经典模式classical
[tool] [tool]
enable_tool = true # 是否在普通聊天中启用工具 enable_tool = true # 是否在普通聊天中启用工具
[tool.history]
enable_history = true # 是否启用工具调用历史记录
enable_prompt_history = true # 是否在提示词中加入工具历史记录
max_history = 5 # 每个会话最多保留的历史记录数
[mood] [mood]
enable_mood = true # 是否启用情绪系统 enable_mood = true # 是否启用情绪系统
mood_update_threshold = 1 # 情绪更新阈值,越高,更新越慢 mood_update_threshold = 1 # 情绪更新阈值,越高,更新越慢