This commit is contained in:
tt-P607
2025-08-28 19:24:22 +08:00
18 changed files with 338 additions and 761 deletions

4
bot.py
View File

@@ -31,7 +31,6 @@ from src.manager.async_task_manager import async_task_manager # noqa
from src.config.config import global_config # noqa
from src.common.database.database import initialize_sql_database # noqa
from src.common.database.sqlalchemy_models import initialize_database as init_db # noqa
from src.common.tool_history import wrap_tool_executor #noqa
logger = get_logger("main")
@@ -240,8 +239,7 @@ class MaiBotMain(BaseMain):
self.setup_timezone()
self.check_and_confirm_eula()
self.initialize_database()
# 初始化工具历史记录
wrap_tool_executor()
return self.create_main_system()

View File

@@ -68,15 +68,15 @@ class ExampleAction(BaseAction):
Action采用**两层决策机制**来优化性能和决策质量:
> 设计目的在加载许多插件的时候降低LLM决策压力避免让麦麦在过多的选项中纠结。
> 设计目的在加载许多插件的时候降低LLM决策压力避免让MoFox-Bot在过多的选项中纠结。
**第一层激活控制Activation Control**
激活决定麦麦是否 **“知道”** 这个Action的存在即这个Action是否进入决策候选池。不被激活的Action麦麦永远不会选择。
激活决定MoFox-Bot是否 **“知道”** 这个Action的存在即这个Action是否进入决策候选池。不被激活的ActionMoFox-Bot永远不会选择。
**第二层使用决策Usage Decision**
在Action被激活后使用条件决定麦麦什么时候会 **“选择”** 使用这个Action。
在Action被激活后使用条件决定MoFox-Bot什么时候会 **“选择”** 使用这个Action。
### 决策参数详解 🔧
@@ -84,8 +84,8 @@ Action采用**两层决策机制**来优化性能和决策质量:
| 激活类型 | 说明 | 使用场景 |
| ----------- | ---------------------------------------- | ---------------------- |
| [`NEVER`](#never-激活) | 从不激活Action对麦麦不可见 | 临时禁用某个Action |
| [`ALWAYS`](#always-激活) | 永远激活Action总是在麦麦的候选池中 | 核心功能,如回复、不回复 |
| [`NEVER`](#never-激活) | 从不激活Action对MoFox-Bot不可见 | 临时禁用某个Action |
| [`ALWAYS`](#always-激活) | 永远激活Action总是在MoFox-Bot的候选池中 | 核心功能,如回复、不回复 |
| [`LLM_JUDGE`](#llm_judge-激活) | 通过LLM智能判断当前情境是否需要激活此Action | 需要智能判断的复杂场景 |
| `RANDOM` | 基于随机概率决定是否激活 | 增加行为随机性的功能 |
| `KEYWORD` | 当检测到特定关键词时激活 | 明确触发条件的功能 |
@@ -184,13 +184,13 @@ class GreetingAction(BaseAction):
#### 第二层:使用决策
**在Action被激活后使用条件决定麦麦什么时候会"选择"使用这个Action**
**在Action被激活后使用条件决定MoFox-Bot什么时候会"选择"使用这个Action**
这一层由以下因素综合决定:
- `action_require`使用场景描述帮助LLM判断何时选择
- `action_parameters`所需参数影响Action的可执行性
- 当前聊天上下文和麦麦的决策逻辑
- 当前聊天上下文和MoFox-Bot的决策逻辑
---
@@ -214,11 +214,11 @@ class EmojiAction(BaseAction):
1. **第一层激活判断**
- 使用随机数进行决策,当`random.random() < self.random_activation_probability`时,麦麦才"知道"可以使用这个Action
- 使用随机数进行决策,当`random.random() < self.random_activation_probability`时,MoFox-Bot才"知道"可以使用这个Action
2. **第二层使用决策**
- 即使Action被激活麦麦还会根据 `action_require` 中的条件判断是否真正选择使用
- 例如:如果刚刚已经发过表情,根据"不要连续发送多个表情"的要求,麦麦可能不会选择这个Action
- 即使Action被激活MoFox-Bot还会根据 `action_require` 中的条件判断是否真正选择使用
- 例如:如果刚刚已经发过表情,根据"不要连续发送多个表情"的要求,MoFox-Bot可能不会选择这个Action
---

View File

@@ -100,7 +100,7 @@ class CycleProcessor:
from src.plugin_system.core.event_manager import event_manager
from src.plugin_system.base.component_types import EventType
# 触发 ON_PLAN 事件
result = await event_manager.trigger_event(EventType.ON_PLAN, stream_id=self.context.stream_id)
result = await event_manager.trigger_event(EventType.ON_PLAN, plugin_name="SYSTEM", stream_id=self.context.stream_id)
if result and not result.all_continue_process():
return

View File

@@ -437,7 +437,7 @@ class ChatBot:
logger.info(f"命令处理完成,跳过后续消息处理: {cmd_result}")
return
result = await event_manager.trigger_event(EventType.ON_MESSAGE,message=message)
result = await event_manager.trigger_event(EventType.ON_MESSAGE,plugin_name="SYSTEM",message=message)
if not result.all_continue_process():
raise UserWarning(f"插件{result.get_summary().get('stopped_handlers','')}于消息到达时取消了消息处理")

View File

@@ -370,7 +370,7 @@ class DefaultReplyer:
from src.plugin_system.core.event_manager import event_manager
if not from_plugin:
result = await event_manager.trigger_event(EventType.POST_LLM,prompt=prompt,stream_id=stream_id)
result = await event_manager.trigger_event(EventType.POST_LLM,plugin_name="SYSTEM",prompt=prompt,stream_id=stream_id)
if not result.all_continue_process():
raise UserWarning(f"插件{result.get_summary().get('stopped_handlers', '')}于请求前中断了内容生成")
@@ -390,7 +390,7 @@ class DefaultReplyer:
}
# 触发 AFTER_LLM 事件
if not from_plugin:
result = await event_manager.trigger_event(EventType.AFTER_LLM,prompt=prompt,llm_response=llm_response,stream_id=stream_id)
result = await event_manager.trigger_event(EventType.AFTER_LLM,plugin_name="SYSTEM",prompt=prompt,llm_response=llm_response,stream_id=stream_id)
if not result.all_continue_process():
raise UserWarning(f"插件{result.get_summary().get('stopped_handlers','')}于请求后取消了内容生成")
except UserWarning as e:

View File

@@ -7,33 +7,11 @@ from contextlib import asynccontextmanager
from typing import Dict, Any, Optional, List, Union
from src.common.logger import get_logger
from src.common.tool_history import ToolHistoryManager
install(extra_lines=3)
logger = get_logger("prompt_build")
# 创建工具历史管理器实例
tool_history_manager = ToolHistoryManager()
def get_tool_history_prompt(message_id: Optional[str] = None) -> str:
"""获取工具历史提示词
Args:
message_id: 会话ID, 用于只获取当前会话的历史
Returns:
格式化的工具历史提示词
"""
from src.config.config import global_config
if not global_config.tool.history.enable_prompt_history:
return ""
return tool_history_manager.get_recent_history_prompt(
chat_id=message_id
)
class PromptContext:
def __init__(self):
self._context_prompts: Dict[str, Dict[str, "Prompt"]] = {}
@@ -49,7 +27,7 @@ class PromptContext:
@_current_context.setter
def _current_context(self, value: Optional[str]):
"""设置当前协程的上下文ID"""
self._current_context_var.set(value)
self._current_context_var.set(value) # type: ignore
@asynccontextmanager
async def async_scope(self, context_id: Optional[str] = None):
@@ -73,7 +51,7 @@ class PromptContext:
# 保存当前协程的上下文值,不影响其他协程
previous_context = self._current_context
# 设置当前协程的新上下文
token = self._current_context_var.set(context_id) if context_id else None
token = self._current_context_var.set(context_id) if context_id else None # type: ignore
else:
# 如果没有提供新上下文,保持当前上下文不变
previous_context = self._current_context
@@ -111,7 +89,8 @@ class PromptContext:
"""异步注册提示模板到指定作用域"""
async with self._context_lock:
if target_context := context_id or self._current_context:
self._context_prompts.setdefault(target_context, {})[prompt.name] = prompt
if prompt.name:
self._context_prompts.setdefault(target_context, {})[prompt.name] = prompt
class PromptManager:
@@ -153,40 +132,15 @@ class PromptManager:
def add_prompt(self, name: str, fstr: str) -> "Prompt":
prompt = Prompt(fstr, name=name)
self._prompts[prompt.name] = prompt
if prompt.name:
self._prompts[prompt.name] = prompt
return prompt
async def format_prompt(self, name: str, **kwargs) -> str:
# 获取当前提示词
prompt = await self.get_prompt_async(name)
# 获取当前会话ID
message_id = self._context._current_context
# 获取工具历史提示词
tool_history = ""
if name in ['action_prompt', 'replyer_prompt', 'planner_prompt', 'tool_executor_prompt']:
tool_history = get_tool_history_prompt(message_id)
# 获取基本格式化结果
result = prompt.format(**kwargs)
# 如果有工具历史,插入到适当位置
if tool_history:
# 查找合适的插入点
# 在人格信息和身份块之后,但在主要内容之前
identity_end = result.find("```\n现在,你说:")
if identity_end == -1:
# 如果找不到特定标记,尝试在第一个段落后插入
first_double_newline = result.find("\n\n")
if first_double_newline != -1:
# 在第一个双换行后插入
result = f"{result[:first_double_newline + 2]}{tool_history}\n{result[first_double_newline + 2:]}"
else:
# 如果找不到合适的位置,添加到开头
result = f"{tool_history}\n\n{result}"
else:
# 在找到的位置插入
result = f"{result[:identity_end]}\n{tool_history}\n{result[identity_end:]}"
return result
@@ -195,6 +149,11 @@ global_prompt_manager = PromptManager()
class Prompt(str):
template: str
name: Optional[str]
args: List[str]
_args: List[Any]
_kwargs: Dict[str, Any]
# 临时标记,作为类常量
_TEMP_LEFT_BRACE = "__ESCAPED_LEFT_BRACE__"
_TEMP_RIGHT_BRACE = "__ESCAPED_RIGHT_BRACE__"
@@ -215,7 +174,7 @@ class Prompt(str):
"""将临时标记还原为实际的花括号字符"""
return template.replace(Prompt._TEMP_LEFT_BRACE, "{").replace(Prompt._TEMP_RIGHT_BRACE, "}")
def __new__(cls, fstr, name: Optional[str] = None, args: Union[List[Any], tuple[Any, ...]] = None, **kwargs):
def __new__(cls, fstr, name: Optional[str] = None, args: Optional[Union[List[Any], tuple[Any, ...]]] = None, **kwargs):
# 如果传入的是元组,转换为列表
if isinstance(args, tuple):
args = list(args)
@@ -251,7 +210,7 @@ class Prompt(str):
@classmethod
async def create_async(
cls, fstr, name: Optional[str] = None, args: Union[List[Any], tuple[Any, ...]] = None, **kwargs
cls, fstr, name: Optional[str] = None, args: Optional[Union[List[Any], tuple[Any, ...]]] = None, **kwargs
):
"""异步创建Prompt实例"""
prompt = cls(fstr, name, args, **kwargs)
@@ -260,7 +219,9 @@ class Prompt(str):
return prompt
@classmethod
def _format_template(cls, template, args: List[Any] = None, kwargs: Dict[str, Any] = None) -> str:
def _format_template(cls, template, args: Optional[List[Any]] = None, kwargs: Optional[Dict[str, Any]] = None) -> str:
if kwargs is None:
kwargs = {}
# 预处理模板中的转义花括号
processed_template = cls._process_escaped_braces(template)

View File

@@ -4,7 +4,7 @@ import hashlib
from pathlib import Path
import numpy as np
import faiss
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional, Union, List
from src.common.logger import get_logger
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config, model_config
@@ -141,7 +141,7 @@ class CacheManager:
# 步骤 2a: L1 语义缓存 (FAISS)
if query_embedding is not None and self.l1_vector_index.ntotal > 0:
faiss.normalize_L2(query_embedding)
distances, indices = self.l1_vector_index.search(query_embedding, 1)
distances, indices = self.l1_vector_index.search(query_embedding, 1) # type: ignore
if indices.size > 0 and distances[0][0] > 0.75: # IP 越大越相似
hit_index = indices[0][0]
l1_hit_key = self.l1_vector_id_to_key.get(hit_index)
@@ -348,4 +348,64 @@ class CacheManager:
logger.info(f"清理了 {len(expired_keys)} 个过期的L1缓存条目")
# 全局实例
tool_cache = CacheManager()
tool_cache = CacheManager()
import inspect
import time
def wrap_tool_executor():
"""
包装工具执行器以添加缓存功能
这个函数应该在系统启动时被调用一次
"""
from src.plugin_system.core.tool_use import ToolExecutor
from src.plugin_system.apis.tool_api import get_tool_instance
original_execute = ToolExecutor.execute_tool_call
async def wrapped_execute_tool_call(self, tool_call, tool_instance=None):
if not tool_instance:
tool_instance = get_tool_instance(tool_call.func_name)
if not tool_instance or not tool_instance.enable_cache:
return await original_execute(self, tool_call, tool_instance)
try:
tool_file_path = inspect.getfile(tool_instance.__class__)
semantic_query = None
if tool_instance.semantic_cache_query_key:
semantic_query = tool_call.args.get(tool_instance.semantic_cache_query_key)
cached_result = await tool_cache.get(
tool_name=tool_call.func_name,
function_args=tool_call.args,
tool_file_path=tool_file_path,
semantic_query=semantic_query
)
if cached_result:
logger.info(f"{getattr(self, 'log_prefix', '')}使用缓存结果,跳过工具 {tool_call.func_name} 执行")
return cached_result
except Exception as e:
logger.error(f"{getattr(self, 'log_prefix', '')}检查工具缓存时出错: {e}")
result = await original_execute(self, tool_call, tool_instance)
try:
tool_file_path = inspect.getfile(tool_instance.__class__)
semantic_query = None
if tool_instance.semantic_cache_query_key:
semantic_query = tool_call.args.get(tool_instance.semantic_cache_query_key)
await tool_cache.set(
tool_name=tool_call.func_name,
function_args=tool_call.args,
tool_file_path=tool_file_path,
data=result,
ttl=tool_instance.cache_ttl,
semantic_query=semantic_query
)
except Exception as e:
logger.error(f"{getattr(self, 'log_prefix', '')}设置工具缓存时出错: {e}")
return result
ToolExecutor.execute_tool_call = wrapped_execute_tool_call

View File

@@ -1,405 +0,0 @@
"""工具执行历史记录模块"""
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
import json
from pathlib import Path
import inspect
from .logger import get_logger
from src.config.config import global_config
from src.common.cache_manager import tool_cache
logger = get_logger("tool_history")
class ToolHistoryManager:
"""工具执行历史记录管理器"""
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not self._initialized:
self._history: List[Dict[str, Any]] = []
self._initialized = True
self._data_dir = Path("data/tool_history")
self._data_dir.mkdir(parents=True, exist_ok=True)
self._history_file = self._data_dir / "tool_history.jsonl"
self._load_history()
def _save_history(self):
"""保存所有历史记录到文件"""
try:
with self._history_file.open("w", encoding="utf-8") as f:
for record in self._history:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
except Exception as e:
logger.error(f"保存工具调用记录失败: {e}")
def _save_record(self, record: Dict[str, Any]):
"""保存单条记录到文件"""
try:
with self._history_file.open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
except Exception as e:
logger.error(f"保存工具调用记录失败: {e}")
def _clean_expired_records(self):
"""清理已过期的记录"""
original_count = len(self._history)
self._history = [record for record in self._history if record.get("ttl_count", 0) < record.get("ttl", 5)]
cleaned_count = original_count - len(self._history)
if cleaned_count > 0:
logger.info(f"清理了 {cleaned_count} 条过期的工具历史记录,剩余 {len(self._history)}")
self._save_history()
else:
logger.debug("没有需要清理的过期工具历史记录")
def record_tool_call(self,
tool_name: str,
args: Dict[str, Any],
result: Any,
execution_time: float,
status: str,
chat_id: Optional[str] = None,
ttl: int = 5):
"""记录工具调用
Args:
tool_name: 工具名称
args: 工具调用参数
result: 工具返回结果
execution_time: 执行时间(秒)
status: 执行状态("completed""error")
chat_id: 聊天ID与ChatManager中的chat_id对应用于标识群聊或私聊会话
ttl: 该记录的生命周期值插入提示词多少次后删除默认为5
"""
# 检查是否启用历史记录且ttl大于0
if not global_config.tool.history.enable_history or ttl <= 0:
return
# 先清理过期记录
self._clean_expired_records()
try:
# 创建记录
record = {
"tool_name": tool_name,
"timestamp": datetime.now().isoformat(),
"arguments": self._sanitize_args(args),
"result": self._sanitize_result(result),
"execution_time": execution_time,
"status": status,
"chat_id": chat_id,
"ttl": ttl,
"ttl_count": 0
}
# 添加到内存中的历史记录
self._history.append(record)
# 保存到文件
self._save_record(record)
if status == "completed":
logger.info(f"工具 {tool_name} 调用完成,耗时:{execution_time:.2f}s")
else:
logger.error(f"工具 {tool_name} 调用失败:{result}")
except Exception as e:
logger.error(f"记录工具调用时发生错误: {e}")
def _sanitize_args(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""清理参数中的敏感信息"""
sensitive_keys = ['api_key', 'token', 'password', 'secret']
sanitized = args.copy()
def _sanitize_value(value):
if isinstance(value, dict):
return {k: '***' if k.lower() in sensitive_keys else _sanitize_value(v)
for k, v in value.items()}
return value
return {k: '***' if k.lower() in sensitive_keys else _sanitize_value(v)
for k, v in sanitized.items()}
def _sanitize_result(self, result: Any) -> Any:
"""清理结果中的敏感信息"""
if isinstance(result, dict):
return self._sanitize_args(result)
return result
def _load_history(self):
"""加载历史记录文件"""
try:
if self._history_file.exists():
self._history = []
with self._history_file.open("r", encoding="utf-8") as f:
for line in f:
try:
record = json.loads(line)
if record.get("ttl_count", 0) < record.get("ttl", 5): # 只加载未过期的记录
self._history.append(record)
except json.JSONDecodeError:
continue
logger.info(f"成功加载了 {len(self._history)} 条历史记录")
except Exception as e:
logger.error(f"加载历史记录失败: {e}")
def query_history(self,
tool_names: Optional[List[str]] = None,
start_time: Optional[Union[datetime, str]] = None,
end_time: Optional[Union[datetime, str]] = None,
chat_id: Optional[str] = None,
limit: Optional[int] = None,
status: Optional[str] = None) -> List[Dict[str, Any]]:
"""查询工具调用历史
Args:
tool_names: 工具名称列表,为空则查询所有工具
start_time: 开始时间可以是datetime对象或ISO格式字符串
end_time: 结束时间可以是datetime对象或ISO格式字符串
chat_id: 聊天ID与ChatManager中的chat_id对应用于查询特定群聊或私聊的历史记录
limit: 返回记录数量限制
status: 执行状态筛选("completed""error")
Returns:
符合条件的历史记录列表
"""
# 先清理过期记录
self._clean_expired_records()
def _parse_time(time_str: Optional[Union[datetime, str]]) -> Optional[datetime]:
if isinstance(time_str, datetime):
return time_str
elif isinstance(time_str, str):
return datetime.fromisoformat(time_str)
return None
filtered_history = self._history
# 按工具名筛选
if tool_names:
filtered_history = [
record for record in filtered_history
if record["tool_name"] in tool_names
]
# 按时间范围筛选
start_dt = _parse_time(start_time)
end_dt = _parse_time(end_time)
if start_dt:
filtered_history = [
record for record in filtered_history
if datetime.fromisoformat(record["timestamp"]) >= start_dt
]
if end_dt:
filtered_history = [
record for record in filtered_history
if datetime.fromisoformat(record["timestamp"]) <= end_dt
]
# 按聊天ID筛选
if chat_id:
filtered_history = [
record for record in filtered_history
if record.get("chat_id") == chat_id
]
# 按状态筛选
if status:
filtered_history = [
record for record in filtered_history
if record["status"] == status
]
# 应用数量限制
if limit:
filtered_history = filtered_history[-limit:]
return filtered_history
def get_recent_history_prompt(self,
limit: Optional[int] = None,
chat_id: Optional[str] = None) -> str:
"""
获取最近工具调用历史的提示词
Args:
limit: 返回的历史记录数量,如果不提供则使用配置中的max_history
chat_id: 会话ID用于只获取当前会话的历史
Returns:
格式化的历史记录提示词
"""
# 检查是否启用历史记录
if not global_config.tool.history.enable_history:
return ""
# 使用配置中的最大历史记录数
if limit is None:
limit = global_config.tool.history.max_history
recent_history = self.query_history(
chat_id=chat_id,
limit=limit
)
if not recent_history:
return ""
prompt = "\n工具执行历史:\n"
needs_save = False
updated_history = []
for record in recent_history:
# 增加ttl计数
record["ttl_count"] = record.get("ttl_count", 0) + 1
needs_save = True
# 如果未超过ttl则添加到提示词中
if record["ttl_count"] < record.get("ttl", 5):
# 提取结果中的name和content
result = record['result']
if isinstance(result, dict):
name = result.get('name', record['tool_name'])
content = result.get('content', str(result))
else:
name = record['tool_name']
content = str(result)
# 格式化内容,去除多余空白和换行
content = content.strip().replace('\n', ' ')
# 如果内容太长则截断
if len(content) > 200:
content = content[:200] + "..."
prompt += f"{name}: \n{content}\n\n"
updated_history.append(record)
# 更新历史记录并保存
if needs_save:
self._history = updated_history
self._save_history()
return prompt
def clear_history(self):
"""清除历史记录"""
self._history.clear()
self._save_history()
logger.info("工具调用历史记录已清除")
def wrap_tool_executor():
"""
包装工具执行器以添加历史记录和缓存功能
这个函数应该在系统启动时被调用一次
"""
from src.plugin_system.core.tool_use import ToolExecutor
from src.plugin_system.apis.tool_api import get_tool_instance
original_execute = ToolExecutor.execute_tool_call
history_manager = ToolHistoryManager()
async def wrapped_execute_tool_call(self, tool_call, tool_instance=None):
start_time = time.time()
# 确保我们有 tool_instance
if not tool_instance:
tool_instance = get_tool_instance(tool_call.func_name)
# 如果没有 tool_instance就无法进行缓存检查直接执行
if not tool_instance:
result = await original_execute(self, tool_call, None)
execution_time = time.time() - start_time
history_manager.record_tool_call(
tool_name=tool_call.func_name,
args=tool_call.args,
result=result,
execution_time=execution_time,
status="completed",
chat_id=getattr(self, 'chat_id', None),
ttl=5 # Default TTL
)
return result
# 新的缓存逻辑
if tool_instance.enable_cache:
try:
tool_file_path = inspect.getfile(tool_instance.__class__)
semantic_query = None
if tool_instance.semantic_cache_query_key:
semantic_query = tool_call.args.get(tool_instance.semantic_cache_query_key)
cached_result = await tool_cache.get(
tool_name=tool_call.func_name,
function_args=tool_call.args,
tool_file_path=tool_file_path,
semantic_query=semantic_query
)
if cached_result:
logger.info(f"{self.log_prefix}使用缓存结果,跳过工具 {tool_call.func_name} 执行")
return cached_result
except Exception as e:
logger.error(f"{self.log_prefix}检查工具缓存时出错: {e}")
try:
result = await original_execute(self, tool_call, tool_instance)
execution_time = time.time() - start_time
# 缓存结果
if tool_instance.enable_cache:
try:
tool_file_path = inspect.getfile(tool_instance.__class__)
semantic_query = None
if tool_instance.semantic_cache_query_key:
semantic_query = tool_call.args.get(tool_instance.semantic_cache_query_key)
await tool_cache.set(
tool_name=tool_call.func_name,
function_args=tool_call.args,
tool_file_path=tool_file_path,
data=result,
ttl=tool_instance.cache_ttl,
semantic_query=semantic_query
)
except Exception as e:
logger.error(f"{self.log_prefix}设置工具缓存时出错: {e}")
# 记录成功的调用
history_manager.record_tool_call(
tool_name=tool_call.func_name,
args=tool_call.args,
result=result,
execution_time=execution_time,
status="completed",
chat_id=getattr(self, 'chat_id', None),
ttl=tool_instance.history_ttl
)
return result
except Exception as e:
execution_time = time.time() - start_time
# 记录失败的调用
history_manager.record_tool_call(
tool_name=tool_call.func_name,
args=tool_call.args,
result=str(e),
execution_time=execution_time,
status="error",
chat_id=getattr(self, 'chat_id', None),
ttl=tool_instance.history_ttl
)
raise
# 替换原始方法
ToolExecutor.execute_tool_call = wrapped_execute_tool_call

View File

@@ -254,7 +254,7 @@ MoFox_Bot(第三方修改版)
try:
await event_manager.trigger_event(EventType.ON_START)
await event_manager.trigger_event(EventType.ON_START,plugin_name="SYSTEM")
init_time = int(1000 * (time.time() - init_start_time))
logger.info(f"初始化完成,神经元放电{init_time}")
except Exception as e:

View File

@@ -1,9 +1,7 @@
from typing import Any, Dict, List, Optional, Type, Union
from datetime import datetime
from typing import Any, Dict, List, Optional, Type
from src.plugin_system.base.base_tool import BaseTool
from src.plugin_system.base.component_types import ComponentType
from src.common.tool_history import ToolHistoryManager
from src.common.logger import get_logger
logger = get_logger("tool_api")
@@ -33,110 +31,4 @@ def get_llm_available_tool_definitions():
from src.plugin_system.core import component_registry
llm_available_tools = component_registry.get_llm_available_tools()
return [(name, tool_class.get_tool_definition()) for name, tool_class in llm_available_tools.items()]
def get_tool_history(
tool_names: Optional[List[str]] = None,
start_time: Optional[Union[datetime, str]] = None,
end_time: Optional[Union[datetime, str]] = None,
chat_id: Optional[str] = None,
limit: Optional[int] = None,
status: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
获取工具调用历史记录
Args:
tool_names: 工具名称列表,为空则查询所有工具
start_time: 开始时间可以是datetime对象或ISO格式字符串
end_time: 结束时间可以是datetime对象或ISO格式字符串
chat_id: 会话ID用于筛选特定会话的调用
limit: 返回记录数量限制
status: 执行状态筛选("completed""error")
Returns:
List[Dict]: 工具调用记录列表,每条记录包含以下字段:
- tool_name: 工具名称
- timestamp: 调用时间
- arguments: 调用参数
- result: 调用结果
- execution_time: 执行时间
- status: 执行状态
- chat_id: 会话ID
"""
history_manager = ToolHistoryManager()
return history_manager.query_history(
tool_names=tool_names,
start_time=start_time,
end_time=end_time,
chat_id=chat_id,
limit=limit,
status=status
)
def get_tool_history_text(
tool_names: Optional[List[str]] = None,
start_time: Optional[Union[datetime, str]] = None,
end_time: Optional[Union[datetime, str]] = None,
chat_id: Optional[str] = None,
limit: Optional[int] = None,
status: Optional[str] = None
) -> str:
"""
获取工具调用历史记录的文本格式
Args:
tool_names: 工具名称列表,为空则查询所有工具
start_time: 开始时间可以是datetime对象或ISO格式字符串
end_time: 结束时间可以是datetime对象或ISO格式字符串
chat_id: 会话ID用于筛选特定会话的调用
limit: 返回记录数量限制
status: 执行状态筛选("completed""error")
Returns:
str: 格式化的工具调用历史记录文本
"""
history = get_tool_history(
tool_names=tool_names,
start_time=start_time,
end_time=end_time,
chat_id=chat_id,
limit=limit,
status=status
)
if not history:
return "没有找到工具调用记录"
text = "工具调用历史记录:\n"
for record in history:
# 提取结果中的name和content
result = record['result']
if isinstance(result, dict):
name = result.get('name', record['tool_name'])
content = result.get('content', str(result))
else:
name = record['tool_name']
content = str(result)
# 格式化内容
content = content.strip().replace('\n', ' ')
if len(content) > 200:
content = content[:200] + "..."
# 格式化时间
timestamp = datetime.fromisoformat(record['timestamp']).strftime("%Y-%m-%d %H:%M:%S")
text += f"[{timestamp}] {name}\n"
text += f"结果: {content}\n\n"
return text
def clear_tool_history() -> None:
"""
清除所有工具调用历史记录
"""
history_manager = ToolHistoryManager()
history_manager.clear_history()
return [(name, tool_class.get_tool_definition()) for name, tool_class in llm_available_tools.items()]

View File

@@ -1,19 +1,20 @@
import asyncio
from typing import List, Dict, Any, Optional
from src.common.logger import get_logger
logger = get_logger("base_event")
class HandlerResult:
"""事件处理器执行结果
所有事件处理器必须返回此类的实例
"""
def __init__(self, success: bool, continue_process: bool, message: str = "", handler_name: str = ""):
def __init__(self, success: bool, continue_process: bool, message: Any = {}, handler_name: str = ""):
self.success = success
self.continue_process = continue_process
self.message = message
self.handler_name = handler_name
def __repr__(self):
return f"HandlerResult(success={self.success}, continue_process={self.continue_process}, message='{self.message}', handler_name='{self.handler_name}')"
@@ -66,13 +67,22 @@ class HandlerResultsCollection:
}
class BaseEvent:
def __init__(self, name: str):
def __init__(
self,
name: str,
allowed_subscribers: List[str]=[],
allowed_triggers: List[str]=[]
):
self.name = name
self.enabled = True
self.allowed_subscribers = allowed_subscribers # 记录事件处理器名
self.allowed_triggers = allowed_triggers # 记录插件名
from src.plugin_system.base.base_events_handler import BaseEventHandler
self.subscribers: List["BaseEventHandler"] = [] # 订阅该事件的事件处理器列表
self.event_handle_lock = asyncio.Lock()
def __name__(self):
return self.name
@@ -88,22 +98,45 @@ class BaseEvent:
if not self.enabled:
return HandlerResultsCollection([])
# 按权重从高到低排序订阅者
# 使用直接属性访问,-1代表自动权重
sorted_subscribers = sorted(self.subscribers, key=lambda h: h.weight if hasattr(h, 'weight') and h.weight != -1 else 0, reverse=True)
results = []
for subscriber in sorted_subscribers:
try:
result = await subscriber.execute(params)
if not result.handler_name:
# 补充handler_name
result.handler_name = subscriber.handler_name if hasattr(subscriber, 'handler_name') else subscriber.__class__.__name__
results.append(result)
except Exception as e:
# 处理执行异常
# 使用锁确保同一个事件不能同时激活多次
async with self.event_handle_lock:
# 按权重从高到低排序订阅者
# 使用直接属性访问,-1代表自动权重
sorted_subscribers = sorted(self.subscribers, key=lambda h: h.weight if hasattr(h, 'weight') and h.weight != -1 else 0, reverse=True)
# 并行执行所有订阅者
tasks = []
for subscriber in sorted_subscribers:
# 为每个订阅者创建执行任务
task = self._execute_subscriber(subscriber, params)
tasks.append(task)
# 等待所有任务完成
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理执行结果
processed_results = []
for i, result in enumerate(results):
subscriber = sorted_subscribers[i]
handler_name = subscriber.handler_name if hasattr(subscriber, 'handler_name') else subscriber.__class__.__name__
logger.error(f"事件处理器 {handler_name} 执行失败: {e}")
results.append(HandlerResult(False, True, str(e), handler_name))
return HandlerResultsCollection(results)
if isinstance(result, Exception):
# 处理执行异常
logger.error(f"事件处理器 {handler_name} 执行失败: {result}")
processed_results.append(HandlerResult(False, True, str(result), handler_name))
else:
# 正常执行结果
if not result.handler_name:
# 补充handler_name
result.handler_name = handler_name
processed_results.append(result)
return HandlerResultsCollection(processed_results)
async def _execute_subscriber(self, subscriber, params: dict) -> HandlerResult:
"""执行单个订阅者处理器"""
try:
return await subscriber.execute(params)
except Exception as e:
# 异常会在 gather 中捕获,这里直接抛出让 gather 处理
raise e

View File

@@ -50,7 +50,8 @@ class CommandArgs:
self._parsed_args = self._raw_args.split()
return self._parsed_args
@property
def is_empty(self) -> bool:
"""检查参数是否为空
@@ -73,7 +74,8 @@ class CommandArgs:
if 0 <= index < len(args):
return args[index]
return default
@property
def get_first(self, default: str = "") -> str:
"""获取第一个参数
@@ -85,7 +87,7 @@ class CommandArgs:
"""
return self.get_arg(0, default)
def get_remaining(self, start_index: int = 1) -> str:
def get_remaining(self, start_index: int = 0) -> str:
"""获取从指定索引开始的剩余参数字符串
Args:

View File

@@ -2,7 +2,6 @@
事件管理器 - 实现Event和EventHandler的单例管理
提供统一的事件注册、管理和触发接口
"""
from typing import Dict, Type, List, Optional, Any, Union
from threading import Lock
@@ -41,12 +40,18 @@ class EventManager:
self._initialized = True
logger.info("EventManager 单例初始化完成")
def register_event(self, event_name: Union[EventType, str]) -> bool:
def register_event(
self,
event_name: Union[EventType, str],
allowed_subscribers: List[str]=[],
allowed_triggers: List[str]=[]
) -> bool:
"""注册一个新的事件
Args:
event_name Union[EventType, str]: 事件名称
allowed_subscribers: List[str]: 事件订阅者白名单,
allowed_triggers: List[str]: 事件触发插件白名单
Returns:
bool: 注册成功返回True已存在返回False
"""
@@ -54,7 +59,7 @@ class EventManager:
logger.warning(f"事件 {event_name} 已存在,跳过注册")
return False
event = BaseEvent(event_name)
event = BaseEvent(event_name,allowed_subscribers,allowed_triggers)
self._events[event_name] = event
logger.info(f"事件 {event_name} 注册成功")
@@ -211,7 +216,12 @@ class EventManager:
if handler_instance in event.subscribers:
logger.warning(f"事件处理器 {handler_name} 已经订阅了事件 {event_name},跳过重复订阅")
return True
# 白名单检查
if event.allowed_subscribers and handler_name not in event.allowed_subscribers:
logger.warning(f"事件处理器 {handler_name} 不在事件 {event_name} 的订阅者白名单中,无法订阅")
return False
event.subscribers.append(handler_instance)
# 按权重从高到低排序订阅者
@@ -265,11 +275,12 @@ class EventManager:
return {handler.handler_name: handler for handler in event.subscribers}
async def trigger_event(self, event_name: Union[EventType, str], **kwargs) -> Optional[HandlerResultsCollection]:
async def trigger_event(self, event_name: Union[EventType, str], plugin_name: Optional[str]="", **kwargs) -> Optional[HandlerResultsCollection]:
"""触发指定事件
Args:
event_name Union[EventType, str]: 事件名称
plugin_name str: 触发事件的插件名
**kwargs: 传递给处理器的参数
Returns:
@@ -281,7 +292,15 @@ class EventManager:
if event is None:
logger.error(f"事件 {event_name} 不存在,无法触发")
return None
# 插件白名单检查
if event.allowed_triggers and not plugin_name:
logger.warning(f"事件 {event_name} 存在触发者白名单缺少plugin_name无法验证权限已拒绝触发")
return None
elif event.allowed_triggers and plugin_name not in event.allowed_triggers:
logger.warning(f"插件 {plugin_name} 没有权限触发事件 {event_name},已拒绝触发!")
return None
return await event.activate(params)
def init_default_events(self) -> None:
@@ -294,12 +313,11 @@ class EventManager:
EventType.POST_LLM,
EventType.AFTER_LLM,
EventType.POST_SEND,
EventType.AFTER_SEND,
EventType.UNKNOWN
EventType.AFTER_SEND
]
for event_name in default_events:
self.register_event(event_name)
self.register_event(event_name,allowed_triggers=["SYSTEM"])
logger.info("默认事件初始化完成")

View File

@@ -9,9 +9,9 @@ from typing import Callable, Optional
from inspect import iscoroutinefunction
from src.plugin_system.apis.permission_api import permission_api
from src.plugin_system.apis.send_api import send_message
from src.plugin_system.apis.send_api import text_to_stream
from src.plugin_system.apis.logging_api import get_logger
from src.common.message import ChatStream
from src.chat.message_receive.chat_stream import ChatStream
logger = get_logger(__name__)
@@ -37,6 +37,8 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None)
async def async_wrapper(*args, **kwargs):
# 尝试从参数中提取 ChatStream 对象
chat_stream = None
# 首先检查位置参数中的 ChatStream
for arg in args:
if isinstance(arg, ChatStream):
chat_stream = arg
@@ -46,21 +48,31 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None)
if chat_stream is None:
chat_stream = kwargs.get('chat_stream')
# 如果还没找到,检查是否是 PlusCommand 方法调用
if chat_stream is None and args:
# 检查第一个参数是否有 message.chat_stream 属性PlusCommand 实例)
instance = args[0]
if hasattr(instance, 'message') and hasattr(instance.message, 'chat_stream'):
chat_stream = instance.message.chat_stream
if chat_stream is None:
logger.error(f"权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
return
# 检查权限
has_permission = permission_api.check_permission(
chat_stream.user_platform,
chat_stream.user_id,
chat_stream.platform,
chat_stream.user_info.user_id,
permission_node
)
if not has_permission:
# 权限不足,发送拒绝消息
message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}"
await send_message(chat_stream, message)
await text_to_stream(message, chat_stream.stream_id)
# 对于PlusCommand的execute方法需要返回适当的元组
if func.__name__ == 'execute' and hasattr(args[0], 'send_text'):
return False, "权限不足", True
return
# 权限检查通过,执行原函数
@@ -83,13 +95,13 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None)
# 检查权限
has_permission = permission_api.check_permission(
chat_stream.user_platform,
chat_stream.user_id,
chat_stream.platform,
chat_stream.user_info.user_id,
permission_node
)
if not has_permission:
logger.warning(f"用户 {chat_stream.user_platform}:{chat_stream.user_id} 没有权限 {permission_node}")
logger.warning(f"用户 {chat_stream.platform}:{chat_stream.user_info.user_id} 没有权限 {permission_node}")
return
# 权限检查通过,执行原函数
@@ -124,6 +136,8 @@ def require_master(deny_message: Optional[str] = None):
async def async_wrapper(*args, **kwargs):
# 尝试从参数中提取 ChatStream 对象
chat_stream = None
# 首先检查位置参数中的 ChatStream
for arg in args:
if isinstance(arg, ChatStream):
chat_stream = arg
@@ -133,20 +147,28 @@ def require_master(deny_message: Optional[str] = None):
if chat_stream is None:
chat_stream = kwargs.get('chat_stream')
# 如果还没找到,检查是否是 PlusCommand 方法调用
if chat_stream is None and args:
# 检查第一个参数是否有 message.chat_stream 属性PlusCommand 实例)
instance = args[0]
if hasattr(instance, 'message') and hasattr(instance.message, 'chat_stream'):
chat_stream = instance.message.chat_stream
if chat_stream is None:
logger.error(f"Master权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
return
# 检查是否为Master用户
is_master = permission_api.is_master(
chat_stream.user_platform,
chat_stream.user_id
chat_stream.platform,
chat_stream.user_info.user_id
)
if not is_master:
# 权限不足,发送拒绝消息
message = deny_message or "❌ 此操作仅限Master用户执行"
await send_message(chat_stream, message)
await text_to_stream(message, chat_stream.stream_id)
if func.__name__ == 'execute' and hasattr(args[0], 'send_text'):
return False, "需要Master权限", True
return
# 权限检查通过,执行原函数
@@ -169,12 +191,12 @@ def require_master(deny_message: Optional[str] = None):
# 检查是否为Master用户
is_master = permission_api.is_master(
chat_stream.user_platform,
chat_stream.user_id
chat_stream.platform,
chat_stream.user_info.user_id
)
if not is_master:
logger.warning(f"用户 {chat_stream.user_platform}:{chat_stream.user_id} 不是Master用户")
logger.warning(f"用户 {chat_stream.platform}:{chat_stream.user_info.user_id} 不是Master用户")
return
# 权限检查通过,执行原函数
@@ -209,8 +231,8 @@ class PermissionChecker:
bool: 是否拥有权限
"""
return permission_api.check_permission(
chat_stream.user_platform,
chat_stream.user_id,
chat_stream.platform,
chat_stream.user_info.user_id,
permission_node
)
@@ -226,8 +248,8 @@ class PermissionChecker:
bool: 是否为Master用户
"""
return permission_api.is_master(
chat_stream.user_platform,
chat_stream.user_id
chat_stream.platform,
chat_stream.user_info.user_id
)
@staticmethod
@@ -248,7 +270,7 @@ class PermissionChecker:
if not has_permission:
message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}"
await send_message(chat_stream, message)
await text_to_stream(message, chat_stream.stream_id)
return has_permission
@@ -269,6 +291,6 @@ class PermissionChecker:
if not is_master:
message = deny_message or "❌ 此操作仅限Master用户执行"
await send_message(chat_stream, message)
await text_to_stream(message, chat_stream.stream_id)
return is_master

View File

@@ -5,52 +5,32 @@
from typing import Tuple
from src.common.logger import get_logger
from src.plugin_system import BaseCommand
from src.plugin_system.base.plus_command import PlusCommand
from src.plugin_system.base.command_args import CommandArgs
from src.plugin_system.utils.permission_decorators import require_permission
from ..services.manager import get_qzone_service, get_config_getter
logger = get_logger("MaiZone.SendFeedCommand")
class SendFeedCommand(BaseCommand):
class SendFeedCommand(PlusCommand):
"""
响应用户通过 `/send_feed` 命令发送说说的请求。
"""
command_name: str = "send_feed"
command_description: str = "发送一条QQ空间说说"
command_pattern: str = r"^/send_feed(?:\s+(?P<topic>.*))?$"
command_help: str = "使用 /send_feed [主题] 来发送一条说说"
command_aliases = ["发空间"]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _check_permission(self) -> bool:
"""检查当前用户是否有权限执行此命令"""
user_id = self.message.message_info.user_info.user_id
if not user_id:
return False
get_config = get_config_getter()
permission_list = get_config("send.permission", [])
permission_type = get_config("send.permission_type", "whitelist")
if not isinstance(permission_list, list):
return False
if permission_type == 'whitelist':
return user_id in permission_list
elif permission_type == 'blacklist':
return user_id not in permission_list
return False
async def execute(self) -> Tuple[bool, str, bool]:
@require_permission("plugin.send.permission")
async def execute(self, args: CommandArgs) -> Tuple[bool, str, bool]:
"""
执行命令的核心逻辑。
"""
if not self._check_permission():
await self.send_text("抱歉,你没有权限使用这个命令哦。")
return False, "权限不足", True
topic = self.matched_groups.get("topic", "")
topic = args.get_remaining()
stream_id = self.message.chat_stream.stream_id
await self.send_text(f"收到!正在为你生成关于“{topic or '随机'}”的说说,请稍候...")

View File

@@ -13,6 +13,7 @@ from src.plugin_system import (
register_plugin
)
from src.plugin_system.base.config_types import ConfigField
from src.plugin_system.apis.permission_api import permission_api
from .actions.read_feed_action import ReadFeedAction
from .actions.send_feed_action import SendFeedAction
@@ -82,7 +83,12 @@ class MaiZoneRefactoredPlugin(BasePlugin):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
permission_api.register_permission_node(
"plugin.send.permission",
"是否可以使用机器人发送说说",
"maiZone",
False
)
content_service = ContentService(self.get_config)
image_service = ImageService(self.get_config)
cookie_service = CookieService(self.get_config)
@@ -102,5 +108,5 @@ class MaiZoneRefactoredPlugin(BasePlugin):
return [
(SendFeedAction.get_action_info(), SendFeedAction),
(ReadFeedAction.get_action_info(), ReadFeedAction),
(SendFeedCommand.get_command_info(), SendFeedCommand),
(SendFeedCommand.get_plus_command_info(), SendFeedCommand),
]

View File

@@ -16,6 +16,7 @@ from src.plugin_system.apis.permission_api import permission_api
from src.plugin_system.apis.logging_api import get_logger
from src.plugin_system.base.component_types import PlusCommandInfo, ChatType
from src.plugin_system.base.config_types import ConfigField
from src.plugin_system.utils.permission_decorators import require_permission, require_master, PermissionChecker
logger = get_logger("Permission")
@@ -46,70 +47,38 @@ class PermissionCommand(PlusCommand):
"permission_manager",
True
)
async def execute(self, args: CommandArgs) -> Tuple[bool, Optional[str], bool]:
"""执行权限管理命令"""
if args.is_empty():
if args.is_empty:
await self._show_help()
return True, "显示帮助信息", True
subcommand = args.get_first().lower()
subcommand = args.get_first.lower()
remaining_args = args.get_args()[1:] # 获取除第一个参数外的所有参数
chat_stream = self.message.chat_stream
# 检查基本查看权限
can_view = permission_api.check_permission(
chat_stream.platform,
chat_stream.user_info.user_id,
"plugin.permission.view"
) or permission_api.is_master(chat_stream.platform, chat_stream.user_info.user_id)
# 检查管理权限
can_manage = permission_api.check_permission(
chat_stream.platform,
chat_stream.user_info.user_id,
"plugin.permission.manage"
) or permission_api.is_master(chat_stream.platform, chat_stream.user_info.user_id)
if subcommand in ["grant", "授权", "give"]:
if not can_manage:
await self.send_text("❌ 你没有权限管理的权限")
return True, "权限不足", True
await self._grant_permission(chat_stream, remaining_args)
return True, "执行授权命令", True
elif subcommand in ["revoke", "撤销", "remove"]:
if not can_manage:
await self.send_text("❌ 你没有权限管理的权限")
return True, "权限不足", True
await self._revoke_permission(chat_stream, remaining_args)
return True, "执行撤销命令", True
elif subcommand in ["list", "列表", "ls"]:
if not can_view:
await self.send_text("❌ 你没有查看权限的权限")
return True, "权限不足", True
await self._list_permissions(chat_stream, remaining_args)
return True, "执行列表命令", True
elif subcommand in ["check", "检查"]:
if not can_view:
await self.send_text("❌ 你没有查看权限的权限")
return True, "权限不足", True
await self._check_permission(chat_stream, remaining_args)
return True, "执行检查命令", True
elif subcommand in ["nodes", "节点"]:
if not can_view:
await self.send_text("❌ 你没有查看权限的权限")
return True, "权限不足", True
await self._list_nodes(chat_stream, remaining_args)
return True, "执行节点命令", True
elif subcommand in ["allnodes", "全部节点", "all"]:
if not can_view:
await self.send_text("❌ 你没有查看权限的权限")
return True, "权限不足", True
await self._list_all_nodes_with_description(chat_stream)
return True, "执行全部节点命令", True
@@ -149,11 +118,18 @@ class PermissionCommand(PlusCommand):
await self.send_text(help_text)
def _parse_user_mention(self, mention: str) -> Optional[str]:
"""解析用户提及提取QQ号"""
"""解析用户提及提取QQ号
支持的格式:
- @<用户名:QQ号> 格式
- [CQ:at,qq=QQ号] 格式
- 直接的QQ号
"""
# 匹配 @<用户名:QQ号> 格式提取QQ号
at_match = re.search(r'@<[^:]+:(\d+)>', mention)
if at_match:
return at_match.group(1)
# 直接是数字
if mention.isdigit():
@@ -161,62 +137,94 @@ class PermissionCommand(PlusCommand):
return None
@staticmethod
def parse_user_from_args(args: CommandArgs, index: int = 0) -> Optional[str]:
"""从CommandArgs中解析用户ID
Args:
args: 命令参数对象
index: 参数索引默认为0第一个参数
Returns:
Optional[str]: 解析出的用户ID如果解析失败返回None
"""
if index >= args.count():
return None
mention = args.get_arg(index)
# 匹配 @<用户名:QQ号> 格式提取QQ号
at_match = re.search(r'@<[^:]+:(\d+)>', mention)
if at_match:
return at_match.group(1)
# 匹配传统的 [CQ:at,qq=数字] 格式
cq_match = re.search(r'\[CQ:at,qq=(\d+)\]', mention)
if cq_match:
return cq_match.group(1)
# 直接是数字
if mention.isdigit():
return mention
return None
@require_permission("plugin.permission.manage", "❌ 你没有权限管理的权限")
async def _grant_permission(self, chat_stream, args: List[str]):
"""授权用户权限"""
if len(args) < 2:
await self.send_text("❌ 用法: /permission grant <@用户|QQ号> <权限节点>")
return
user_mention = args[0]
permission_node = args[1]
# 解析用户ID
user_id = self._parse_user_mention(user_mention)
# 解析用户ID - 使用新的解析方法
user_id = self._parse_user_mention(args[0])
if not user_id:
await self.send_text("❌ 无效的用户格式,请使用 @用户 或直接输入QQ号")
await self.send_text("❌ 无效的用户格式,请使用 @<用户名:QQ号> 或直接输入QQ号")
return
permission_node = args[1]
# 执行授权
success = permission_api.grant_permission(chat_stream.platform, user_id, permission_node)
if success:
await self.send_text(f"✅ 已授权用户 {user_id} 权限节点 {permission_node}")
await self.send_text(f"✅ 已授权用户 {user_id} 权限节点 `{permission_node}`")
else:
await self.send_text("❌ 授权失败,请检查权限节点是否存在")
@require_permission("plugin.permission.manage", "❌ 你没有权限管理的权限")
async def _revoke_permission(self, chat_stream, args: List[str]):
"""撤销用户权限"""
if len(args) < 2:
await self.send_text("❌ 用法: /permission revoke <@用户|QQ号> <权限节点>")
return
user_mention = args[0]
permission_node = args[1]
# 解析用户ID
user_id = self._parse_user_mention(user_mention)
# 解析用户ID - 使用新的解析方法
user_id = self._parse_user_mention(args[0])
if not user_id:
await self.send_text("❌ 无效的用户格式,请使用 @用户 或直接输入QQ号")
await self.send_text("❌ 无效的用户格式,请使用 @<用户名:QQ号> 或直接输入QQ号")
return
permission_node = args[1]
# 执行撤销
success = permission_api.revoke_permission(chat_stream.platform, user_id, permission_node)
if success:
await self.send_text(f"✅ 已撤销用户 {user_id} 权限节点 {permission_node}")
await self.send_text(f"✅ 已撤销用户 {user_id} 权限节点 `{permission_node}`")
else:
await self.send_text("❌ 撤销失败,请检查权限节点是否存在")
@require_permission("plugin.permission.view", "❌ 你没有查看权限的权限")
async def _list_permissions(self, chat_stream, args: List[str]):
"""列出用户权限"""
target_user_id = None
if args:
# 指定了用户
user_mention = args[0]
target_user_id = self._parse_user_mention(user_mention)
# 指定了用户 - 使用新的解析方法
target_user_id = self._parse_user_mention(args[0])
if not target_user_id:
await self.send_text("❌ 无效的用户格式,请使用 @用户 或直接输入QQ号")
await self.send_text("❌ 无效的用户格式,请使用 @<用户名:QQ号> 或直接输入QQ号")
return
else:
# 查看自己的权限
@@ -229,45 +237,46 @@ class PermissionCommand(PlusCommand):
permissions = permission_api.get_user_permissions(chat_stream.platform, target_user_id)
if is_master:
response = f"👑 用户 {target_user_id} 是Master用户拥有所有权限"
response = f"👑 用户 `{target_user_id}` 是Master用户拥有所有权限"
else:
if permissions:
perm_list = "\n".join([f"{perm}" for perm in permissions])
response = f"📋 用户 {target_user_id} 拥有的权限:\n{perm_list}"
perm_list = "\n".join([f"`{perm}`" for perm in permissions])
response = f"📋 用户 `{target_user_id}` 拥有的权限:\n{perm_list}"
else:
response = f"📋 用户 {target_user_id} 没有任何权限"
response = f"📋 用户 `{target_user_id}` 没有任何权限"
await self.send_text(response)
@require_permission("plugin.permission.view", "❌ 你没有查看权限的权限")
async def _check_permission(self, chat_stream, args: List[str]):
"""检查用户权限"""
if len(args) < 2:
await self.send_text("❌ 用法: /permission check <@用户|QQ号> <权限节点>")
return
user_mention = args[0]
permission_node = args[1]
# 解析用户ID
user_id = self._parse_user_mention(user_mention)
# 解析用户ID - 使用新的解析方法
user_id = self._parse_user_mention(args[0])
if not user_id:
await self.send_text("❌ 无效的用户格式,请使用 @用户 或直接输入QQ号")
await self.send_text("❌ 无效的用户格式,请使用 @<用户名:QQ号> 或直接输入QQ号")
return
permission_node = args[1]
# 检查权限
has_permission = permission_api.check_permission(chat_stream.platform, user_id, permission_node)
is_master = permission_api.is_master(chat_stream.platform, user_id)
if has_permission:
if is_master:
response = f"✅ 用户 {user_id} 拥有权限 {permission_node}Master用户"
response = f"✅ 用户 `{user_id}` 拥有权限 `{permission_node}`Master用户"
else:
response = f"✅ 用户 {user_id} 拥有权限 {permission_node}"
response = f"✅ 用户 `{user_id}` 拥有权限 `{permission_node}`"
else:
response = f"❌ 用户 {user_id} 没有权限 {permission_node}"
response = f"❌ 用户 `{user_id}` 没有权限 `{permission_node}`"
await self.send_text(response)
@require_permission("plugin.permission.view", "❌ 你没有查看权限的权限")
async def _list_nodes(self, chat_stream, args: List[str]):
"""列出权限节点"""
plugin_name = args[0] if args else None
@@ -300,6 +309,7 @@ class PermissionCommand(PlusCommand):
await self.send_text(response)
@require_permission("plugin.permission.view", "❌ 你没有查看权限的权限")
async def _list_all_nodes_with_description(self, chat_stream):
"""列出所有插件的权限节点(带详细描述)"""
# 获取所有权限节点

View File

@@ -1,7 +1,7 @@
[inner]
version = "6.5.7"
#----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
#----以下是给开发人员阅读的,如果你只是部署了MoFox-Bot,不需要阅读----
#如果你想要修改配置文件请递增version的值
#如果新增项目请阅读src/config/official_configs.py中的说明
#
@@ -9,7 +9,7 @@ version = "6.5.7"
# 主版本号MMC版本更新
# 次版本号:配置文件内容大更新
# 修订号:配置文件内容小更新
#----以上是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
#----以上是给开发人员阅读的,如果你只是部署了MoFox-Bot,不需要阅读----
[database]# 数据库配置
database_type = "sqlite" # 数据库类型,支持 "sqlite" 或 "mysql"
@@ -48,9 +48,9 @@ master_users = []# ["qq", "123456789"], # 示例QQ平台的Master用户
[bot]
platform = "qq"
qq_account = 1145141919810 # 麦麦的QQ账号
nickname = "麦麦" # 麦麦的昵称
alias_names = ["麦叠", "牢麦"] # 麦麦的别名
qq_account = 1145141919810 # MoFox-Bot的QQ账号
nickname = "MoFox-Bot" # MoFox-Bot的昵称
alias_names = ["麦叠", "牢麦"] # MoFox-Bot的别名
[command]
command_prefixes = ['/', '!', '.', '#']
@@ -64,7 +64,7 @@ personality_side = "用一句话或几句话描述人格的侧面特质"
# 可以描述外貌,性别,身高,职业,属性等等描述
identity = "年龄为19岁,是女孩子,身高为160cm,有黑色的短发"
# 描述麦麦说话的表达风格,表达习惯,如要修改,可以酌情新增内容
# 描述MoFox-Bot说话的表达风格,表达习惯,如要修改,可以酌情新增内容
reply_style = "回复可以简短一些。可以参考贴吧,知乎和微博的回复风格,回复不要浮夸,不要用夸张修辞,平淡一些。"
#回复的Prompt模式选择s4u为原有s4u样式normal为0.9之前的模式
@@ -109,12 +109,12 @@ learning_strength = 1.0
[chat] #麦麦的聊天通用设置
[chat] #MoFox-Bot的聊天通用设置
focus_value = 1
# 麦麦的专注思考能力越高越容易专注可能消耗更多token
# MoFox-Bot的专注思考能力越高越容易专注可能消耗更多token
# 专注时能更好把握发言时机,能够进行持久的连续对话
talk_frequency = 1 # 麦麦活跃度,越高,麦麦回复越频繁
talk_frequency = 1 # MoFox-Bot活跃度越高MoFox-Bot回复越频繁
# 强制私聊专注模式
force_focus_private = false # 是否强制私聊进入专注模式,开启后私聊将始终保持专注状态
@@ -124,7 +124,7 @@ group_chat_mode = "auto" # 群聊聊天模式auto-自动切换normal-强
max_context_size = 25 # 上下文长度
thinking_timeout = 40 # 麦麦一次回复最长思考规划时间超过这个时间的思考会放弃往往是api反应太慢
thinking_timeout = 40 # MoFox-Bot一次回复最长思考规划时间超过这个时间的思考会放弃往往是api反应太慢
replyer_random_probability = 0.5 # 首要replyer模型被选择的概率
mentioned_bot_inevitable_reply = true # 提及 bot 必然回复
@@ -181,7 +181,7 @@ delta_sigma = 120 # 正态分布的标准差,控制时间间隔的随机程度
[relationship]
enable_relationship = true # 是否启用关系系统
relation_frequency = 1 # 关系频率,麦麦构建关系的频率
relation_frequency = 1 # 关系频率,MoFox-Bot构建关系的频率
[message_receive]
@@ -238,30 +238,30 @@ enable_mood = true # 是否启用情绪系统
mood_update_threshold = 1 # 情绪更新阈值,越高,更新越慢
[emoji]
emoji_chance = 0.6 # 麦麦激活表情包动作的概率
emoji_chance = 0.6 # MoFox-Bot激活表情包动作的概率
emoji_activate_type = "llm" # 表情包激活类型可选randomllm ; random下表情包动作随机启用llm下表情包动作根据llm判断是否启用
max_reg_num = 60 # 表情包最大注册数量
do_replace = true # 开启则在达到最大数量时删除(替换)表情包,关闭则达到最大数量时不会继续收集表情包
check_interval = 10 # 检查表情包(注册,破损,删除)的时间间隔(分钟)
steal_emoji = true # 是否偷取表情包,让麦麦可以将一些表情包据为己有
steal_emoji = true # 是否偷取表情包,让MoFox-Bot可以将一些表情包据为己有
content_filtration = false # 是否启用表情包过滤,只有符合该要求的表情包才会被保存
filtration_prompt = "符合公序良俗" # 表情包过滤要求,只有符合该要求的表情包才会被保存
enable_emotion_analysis = false # 是否启用表情包感情关键词二次识别,启用后表情包在第一次识别完毕后将送入第二次大模型识别来总结感情关键词,并构建进回复和决策器的上下文消息中
[memory]
enable_memory = true # 是否启用记忆系统
memory_build_interval = 600 # 记忆构建间隔 单位秒 间隔越低,麦麦学习越多,但是冗余信息也会增多
memory_build_interval = 600 # 记忆构建间隔 单位秒 间隔越低,MoFox-Bot学习越多,但是冗余信息也会增多
memory_build_distribution = [6.0, 3.0, 0.6, 32.0, 12.0, 0.4] # 记忆构建分布参数分布1均值标准差权重分布2均值标准差权重
memory_build_sample_num = 8 # 采样数量,数值越高记忆采样次数越多
memory_build_sample_length = 30 # 采样长度,数值越高一段记忆内容越丰富
memory_compress_rate = 0.1 # 记忆压缩率 控制记忆精简程度 建议保持默认,调高可以获得更多信息,但是冗余信息也会增多
forget_memory_interval = 3000 # 记忆遗忘间隔 单位秒 间隔越低,麦麦遗忘越频繁,记忆更精简,但更难学习
forget_memory_interval = 3000 # 记忆遗忘间隔 单位秒 间隔越低,MoFox-Bot遗忘越频繁,记忆更精简,但更难学习
memory_forget_time = 48 #多长时间后的记忆会被遗忘 单位小时
memory_forget_percentage = 0.008 # 记忆遗忘比例 控制记忆遗忘程度 越大遗忘越多 建议保持默认
consolidate_memory_interval = 1000 # 记忆整合间隔 单位秒 间隔越低,麦麦整合越频繁,记忆更精简
consolidate_memory_interval = 1000 # 记忆整合间隔 单位秒 间隔越低,MoFox-Bot整合越频繁,记忆更精简
consolidation_similarity_threshold = 0.7 # 相似度阈值
consolidation_check_percentage = 0.05 # 检查节点比例
@@ -273,7 +273,7 @@ enable_vector_instant_memory = true # 是否启用基于向量的瞬时记忆
memory_ban_words = [ "表情包", "图片", "回复", "聊天记录" ]
[voice]
enable_asr = false # 是否启用语音识别,启用后麦麦可以识别语音消息,启用该功能需要配置语音识别模型[model.voice]s
enable_asr = false # 是否启用语音识别,启用后MoFox-Bot可以识别语音消息,启用该功能需要配置语音识别模型[model.voice]s
[lpmm_knowledge] # lpmm知识库配置
enable = false # 是否启用lpmm知识库