From cbf9a21c1f36e329ed56c1816039926b276dab88 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Mon, 25 Aug 2025 17:46:23 +0800 Subject: [PATCH] =?UTF-8?q?event=E9=87=8D=E6=9E=84=E6=9A=82=E6=97=B6?= =?UTF-8?q?=E5=AE=8C=E6=88=90=EF=BC=8C=E5=BE=85=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/chat_loop/cycle_processor.py | 7 + src/chat/message_receive/bot.py | 7 +- src/chat/replyer/default_generator.py | 18 +- src/main.py | 10 +- src/plugin_system/base/base_tool.py | 6 +- src/plugin_system/base/component_types.py | 30 ++- src/plugin_system/core/__init__.py | 4 +- src/plugin_system/core/component_registry.py | 27 +- src/plugin_system/core/event_manager.py | 4 +- src/plugin_system/core/events_manager.py | 262 ------------------- 10 files changed, 77 insertions(+), 298 deletions(-) delete mode 100644 src/plugin_system/core/events_manager.py diff --git a/src/chat/chat_loop/cycle_processor.py b/src/chat/chat_loop/cycle_processor.py index 9f05b4a3f..927f45949 100644 --- a/src/chat/chat_loop/cycle_processor.py +++ b/src/chat/chat_loop/cycle_processor.py @@ -97,6 +97,13 @@ class CycleProcessor: if not skip_planner: plan_result, target_message = await self.action_planner.plan(mode=self.context.loop_mode) + from src.plugin_system.core.event_manager import event_manager + from src.plugin_system.base.component_types import EventType + # 触发 ON_PLAN 事件 + result = await event_manager.trigger_event(EventType.ON_PLAN, stream_id=self.chat_stream.stream_id) + if result and not result.all_continue_process(): + return + action_result = plan_result.get("action_result", {}) if isinstance(plan_result, dict) else {} if not isinstance(action_result, dict): action_result = {} diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 226f2ff7d..0067e1b47 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -13,7 +13,7 @@ from src.chat.message_receive.message import MessageRecv, MessageRecvS4U from src.chat.message_receive.storage import MessageStorage from src.chat.heart_flow.heartflow_message_processor import HeartFCMessageReceiver from src.chat.utils.prompt_builder import Prompt, global_prompt_manager -from src.plugin_system.core import component_registry, events_manager, global_announcement_manager +from src.plugin_system.core import component_registry, event_manager, global_announcement_manager from src.plugin_system.base import BaseCommand, EventType from src.mais4u.mais4u_chat.s4u_msg_processor import S4UMessageProcessor @@ -303,8 +303,9 @@ class ChatBot: logger.info(f"命令处理完成,跳过后续消息处理: {cmd_result}") return - if not await events_manager.handle_mai_events(EventType.ON_MESSAGE, message): - return + result = await event_manager.trigger_event(EventType.ON_MESSAGE,message=message) + if not result.all_continue_process(): + raise UserWarning(f"插件{result.get_summary().get('stopped_handlers','')}于消息到达时取消了消息处理") # 确认从接口发来的message是否有自定义的prompt模板信息 if message.message_info.template_info and not message.message_info.template_info.template_default: diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 0093611d7..113689d45 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -366,13 +366,12 @@ class DefaultReplyer: if not prompt: logger.warning("构建prompt失败,跳过回复生成") return False, None, None - from src.plugin_system.core.events_manager import events_manager + from src.plugin_system.core.event_manager import event_manager if not from_plugin: - if not await events_manager.handle_mai_events( - EventType.POST_LLM, None, prompt, None, stream_id=stream_id - ): - raise UserWarning("插件于请求前中断了内容生成") + result = await event_manager.trigger_event(EventType.POST_LLM,prompt=prompt,llm_response=llm_response,stream_id=stream_id) + if not result.all_continue_process(): + raise UserWarning(f"插件{result.get_summary().get('stopped_handlers', '')}于请求前中断了内容生成") # 4. 调用 LLM 生成回复 content = None @@ -388,10 +387,11 @@ class DefaultReplyer: "model": model_name, "tool_calls": tool_call, } - if not from_plugin and not await events_manager.handle_mai_events( - EventType.AFTER_LLM, None, prompt, llm_response, stream_id=stream_id - ): - raise UserWarning("插件于请求后取消了内容生成") + # 触发 AFTER_LLM 事件 + if not from_plugin: + result = await event_manager.trigger_event(EventType.AFTER_LLM,prompt=prompt,llm_response=llm_response,stream_id=stream_id) + if not result.all_continue_process(): + raise UserWarning(f"插件{result.get_summary().get("stopped_handlers","")}于请求后取消了内容生成") except UserWarning as e: raise e except Exception as llm_e: diff --git a/src/main.py b/src/main.py index ceeec4940..9d15443d1 100644 --- a/src/main.py +++ b/src/main.py @@ -19,6 +19,8 @@ from src.mood.mood_manager import mood_manager from rich.traceback import install from src.manager.schedule_manager import schedule_manager from src.schedule.monthly_plan_manager import MonthlyPlanManager +from src.plugin_system.core.event_manager import event_manager +from src.plugin_system.base.component_types import EventType # from src.api.main import start_api_server # 导入新的插件管理器和热重载管理器 @@ -147,6 +149,9 @@ MaiMbot-Pro-Max(第三方修改版) # 添加统计信息输出任务 await async_task_manager.add_task(StatisticOutputTask()) + # 注册默认事件 + event_manager.init_default_events() + # 初始化权限管理器 from src.plugin_system.core.permission_manager import PermissionManager from src.plugin_system.apis.permission_api import permission_api @@ -161,8 +166,10 @@ MaiMbot-Pro-Max(第三方修改版) # 加载所有actions,包括默认的和插件的 plugin_manager.load_all_plugins() - # 启动插件热重载系统 + # 处理所有缓存的事件订阅(插件加载完成后) + event_manager.process_all_pending_subscriptions() + # 启动插件热重载系统 hot_reload_manager.start() # 初始化表情管理器 @@ -225,6 +232,7 @@ MaiMbot-Pro-Max(第三方修改版) try: + await event_manager.trigger_event(EventType.ON_START) init_time = int(1000 * (time.time() - init_start_time)) logger.info(f"初始化完成,神经元放电{init_time}次") except Exception as e: diff --git a/src/plugin_system/base/base_tool.py b/src/plugin_system/base/base_tool.py index e2220fd91..974488063 100644 --- a/src/plugin_system/base/base_tool.py +++ b/src/plugin_system/base/base_tool.py @@ -72,7 +72,7 @@ class BaseTool(ABC): """ raise NotImplementedError("子类必须实现execute方法") - async def direct_execute(self, **function_args: dict[str, Any]) -> dict[str, Any]: + async def direct_execute(self, **kwargs: dict[str, Any]) -> dict[str, Any]: """直接执行工具函数(供插件调用) 通过该方法,插件可以直接调用工具,而不需要传入字典格式的参数 插件可以直接调用此方法,用更加明了的方式传入参数 @@ -88,10 +88,10 @@ class BaseTool(ABC): """ parameter_required = [param[0] for param in self.parameters if param[3]] # 获取所有必填参数名 for param_name in parameter_required: - if param_name not in function_args: + if param_name not in kwargs: raise ValueError(f"工具类 {self.__class__.__name__} 缺少必要参数: {param_name}") - return await self.execute(function_args) + return await self.execute(kwargs) def get_config(self, key: str, default=None): """获取插件配置值,使用嵌套键访问 diff --git a/src/plugin_system/base/component_types.py b/src/plugin_system/base/component_types.py index e4576287a..0d22bf63e 100644 --- a/src/plugin_system/base/component_types.py +++ b/src/plugin_system/base/component_types.py @@ -12,9 +12,9 @@ class ComponentType(Enum): ACTION = "action" # 动作组件 COMMAND = "command" # 命令组件 - TOOL = "tool" # 服务组件(预留) + TOOL = "tool" # 工具组件 SCHEDULER = "scheduler" # 定时任务组件(预留) - EVENT_HANDLER = "event_handler" # 事件处理组件(预留) + EVENT_HANDLER = "event_handler" # 事件处理组件 def __str__(self) -> str: return self.value @@ -188,6 +188,32 @@ class EventHandlerInfo(ComponentInfo): super().__post_init__() self.component_type = ComponentType.EVENT_HANDLER +@dataclass +class EventInfo(ComponentInfo): + """事件组件信息""" + + def __post_init__(self): + super().__post_init__() + self.component_type = ComponentType.EVENT + +# 事件类型枚举 +class EventType(Enum): + """ + 事件类型枚举类 + """ + + ON_START = "on_start" # 启动事件,用于调用按时任务 + ON_STOP = "on_stop" # 停止事件,用于调用按时任务 + ON_MESSAGE = "on_message" + ON_PLAN = "on_plan" + POST_LLM = "post_llm" + AFTER_LLM = "after_llm" + POST_SEND = "post_send" + AFTER_SEND = "after_send" + UNKNOWN = "unknown" # 未知事件类型 + + def __str__(self) -> str: + return self.value @dataclass class PluginInfo: diff --git a/src/plugin_system/core/__init__.py b/src/plugin_system/core/__init__.py index 6867991ac..a73b99acb 100644 --- a/src/plugin_system/core/__init__.py +++ b/src/plugin_system/core/__init__.py @@ -6,14 +6,14 @@ from src.plugin_system.core.plugin_manager import plugin_manager from src.plugin_system.core.component_registry import component_registry -from src.plugin_system.core.events_manager import events_manager +from src.plugin_system.core.event_manager import event_manager from src.plugin_system.core.global_announcement_manager import global_announcement_manager from src.plugin_system.core.plugin_hot_reload import hot_reload_manager __all__ = [ "plugin_manager", "component_registry", - "events_manager", + "event_manager", "global_announcement_manager", "hot_reload_manager", ] diff --git a/src/plugin_system/core/component_registry.py b/src/plugin_system/core/component_registry.py index 57b16b294..dc441d7e9 100644 --- a/src/plugin_system/core/component_registry.py +++ b/src/plugin_system/core/component_registry.py @@ -10,12 +10,14 @@ from src.plugin_system.base.component_types import ( CommandInfo, EventHandlerInfo, PluginInfo, + EventInfo, ComponentType, ) from src.plugin_system.base.base_command import BaseCommand from src.plugin_system.base.base_action import BaseAction from src.plugin_system.base.base_tool import BaseTool from src.plugin_system.base.base_events_handler import BaseEventHandler +from src.plugin_system.base.base_event import BaseEvent logger = get_logger("component_registry") @@ -220,14 +222,6 @@ class ComponentRegistry: logger.warning(f"EventHandler组件 {handler_name} 未启用") return True # 未启用,但是也是注册成功 - from .events_manager import events_manager # 延迟导入防止循环导入问题 - - if events_manager.register_event_subscriber(handler_info, handler_class): - self._enabled_event_handlers[handler_name] = handler_class - return True - else: - logger.error(f"注册事件处理器 {handler_name} 失败") - return False # === 组件移除相关 === @@ -261,12 +255,14 @@ class ComponentRegistry: case ComponentType.EVENT_HANDLER: # 移除EventHandler注册和事件订阅 - from .events_manager import events_manager # 延迟导入防止循环导入问题 + from .event_manager import event_manager # 延迟导入防止循环导入问题 self._event_handler_registry.pop(component_name, None) self._enabled_event_handlers.pop(component_name, None) try: - await events_manager.unregister_event_subscriber(component_name) + handler = event_manager.get_event_handler(component_name) + for event in handler.subscribed_events: + await event_manager.unsubscribe_handler_from_event(event, component_name) logger.debug(f"已移除EventHandler组件: {component_name}") except Exception as e: logger.warning(f"移除EventHandler事件订阅时出错: {e}") @@ -336,9 +332,9 @@ class ComponentRegistry: assert isinstance(target_component_info, EventHandlerInfo) assert issubclass(target_component_class, BaseEventHandler) self._enabled_event_handlers[component_name] = target_component_class - from .events_manager import events_manager # 延迟导入防止循环导入问题 + from .event_manager import event_manager # 延迟导入防止循环导入问题 + event_manager.register_event_handler(component_name) - events_manager.register_event_subscriber(target_component_info, target_component_class) namespaced_name = f"{component_type}.{component_name}" self._components[namespaced_name].enabled = True self._components_by_type[component_type][component_name].enabled = True @@ -369,9 +365,12 @@ class ComponentRegistry: self._llm_available_tools.pop(component_name) case ComponentType.EVENT_HANDLER: self._enabled_event_handlers.pop(component_name) - from .events_manager import events_manager # 延迟导入防止循环导入问题 + from .event_manager import event_manager # 延迟导入防止循环导入问题 + + handler = event_manager.get_event_handler(component_name) + for event in handler.subscribed_events: + await event_manager.unsubscribe_handler_from_event(event, component_name) - await events_manager.unregister_event_subscriber(component_name) self._components[component_name].enabled = False self._components_by_type[component_type][component_name].enabled = False logger.info(f"组件 {component_name} 已禁用") diff --git a/src/plugin_system/core/event_manager.py b/src/plugin_system/core/event_manager.py index cf5e4614f..ea3d04a70 100644 --- a/src/plugin_system/core/event_manager.py +++ b/src/plugin_system/core/event_manager.py @@ -170,13 +170,13 @@ class EventManager: return True def get_event_handler(self, handler_name: str) -> Optional[Type[BaseEventHandler]]: - """获取指定事件处理器类 + """获取指定事件处理器实例 Args: handler_name (str): 处理器名称 Returns: - Type[BaseEventHandler]: 处理器类,不存在返回None + Type[BaseEventHandler]: 处理器实例,不存在返回None """ return self._event_handlers.get(handler_name) diff --git a/src/plugin_system/core/events_manager.py b/src/plugin_system/core/events_manager.py deleted file mode 100644 index a9c7f683c..000000000 --- a/src/plugin_system/core/events_manager.py +++ /dev/null @@ -1,262 +0,0 @@ -import asyncio -import contextlib -from typing import List, Dict, Optional, Type, Tuple, Any - -from src.chat.message_receive.message import MessageRecv -from src.chat.message_receive.chat_stream import get_chat_manager -from src.common.logger import get_logger -from src.plugin_system.base.component_types import EventType, EventHandlerInfo, MaiMessages -from src.plugin_system.base.base_events_handler import BaseEventHandler -from .global_announcement_manager import global_announcement_manager - -logger = get_logger("events_manager") - - -class EventsManager: - def __init__(self): - # 有权重的 events 订阅者注册表 - self._events_subscribers: Dict[EventType, List[BaseEventHandler]] = {event: [] for event in EventType} - self._handler_mapping: Dict[str, Type[BaseEventHandler]] = {} # 事件处理器映射表 - self._handler_tasks: Dict[str, List[asyncio.Task]] = {} # 事件处理器正在处理的任务 - - def register_event_subscriber(self, handler_info: EventHandlerInfo, handler_class: Type[BaseEventHandler]) -> bool: - """注册事件处理器 - - Args: - handler_info (EventHandlerInfo): 事件处理器信息 - handler_class (Type[BaseEventHandler]): 事件处理器类 - - Returns: - bool: 是否注册成功 - """ - handler_name = handler_info.name - - if handler_name in self._handler_mapping: - logger.warning(f"事件处理器 {handler_name} 已存在,跳过注册") - return True - - if not issubclass(handler_class, BaseEventHandler): - logger.error(f"类 {handler_class.__name__} 不是 BaseEventHandler 的子类") - return False - - self._handler_mapping[handler_name] = handler_class - return self._insert_event_handler(handler_class, handler_info) - - async def handle_mai_events( - self, - event_type: EventType, - message: Optional[MessageRecv] = None, - llm_prompt: Optional[str] = None, - llm_response: Optional[Dict[str, Any]] = None, - stream_id: Optional[str] = None, - action_usage: Optional[List[str]] = None, - ) -> bool: - """处理 events""" - from src.plugin_system.core import component_registry - - continue_flag = True - transformed_message: Optional[MaiMessages] = None - if not message: - assert stream_id, "如果没有消息,必须提供流ID" - if event_type in [EventType.ON_MESSAGE, EventType.ON_PLAN, EventType.POST_LLM, EventType.AFTER_LLM]: - transformed_message = self._build_message_from_stream(stream_id, llm_prompt, llm_response) - else: - transformed_message = self._transform_event_without_message( - stream_id, llm_prompt, llm_response, action_usage - ) - else: - transformed_message = self._transform_event_message(message, llm_prompt, llm_response) - for handler in self._events_subscribers.get(event_type, []): - if transformed_message.stream_id: - stream_id = transformed_message.stream_id - if handler.handler_name in global_announcement_manager.get_disabled_chat_event_handlers(stream_id): - continue - handler.set_plugin_config(component_registry.get_plugin_config(handler.plugin_name) or {}) - if handler.intercept_message: - try: - success, continue_processing, result = await handler.execute(transformed_message) - if not success: - logger.error(f"EventHandler {handler.handler_name} 执行失败: {result}") - else: - logger.debug(f"EventHandler {handler.handler_name} 执行成功: {result}") - continue_flag = continue_flag and continue_processing - except Exception as e: - logger.error(f"EventHandler {handler.handler_name} 发生异常: {e}") - continue - else: - try: - handler_task = asyncio.create_task(handler.execute(transformed_message)) - handler_task.add_done_callback(self._task_done_callback) - handler_task.set_name(f"{handler.plugin_name}-{handler.handler_name}") - if handler.handler_name not in self._handler_tasks: - self._handler_tasks[handler.handler_name] = [] - self._handler_tasks[handler.handler_name].append(handler_task) - except Exception as e: - logger.error(f"创建事件处理器任务 {handler.handler_name} 时发生异常: {e}") - continue - return continue_flag - - def _insert_event_handler(self, handler_class: Type[BaseEventHandler], handler_info: EventHandlerInfo) -> bool: - """插入事件处理器到对应的事件类型列表中并设置其插件配置""" - if handler_class.event_type == EventType.UNKNOWN: - logger.error(f"事件处理器 {handler_class.__name__} 的事件类型未知,无法注册") - return False - - handler_instance = handler_class() - handler_instance.set_plugin_name(handler_info.plugin_name or "unknown") - self._events_subscribers[handler_class.event_type].append(handler_instance) - self._events_subscribers[handler_class.event_type].sort(key=lambda x: x.weight, reverse=True) - - return True - - def _remove_event_handler_instance(self, handler_class: Type[BaseEventHandler]) -> bool: - """从事件类型列表中移除事件处理器""" - display_handler_name = handler_class.handler_name or handler_class.__name__ - if handler_class.event_type == EventType.UNKNOWN: - logger.warning(f"事件处理器 {display_handler_name} 的事件类型未知,不存在于处理器列表中") - return False - - handlers = self._events_subscribers[handler_class.event_type] - for i, handler in enumerate(handlers): - if isinstance(handler, handler_class): - del handlers[i] - logger.debug(f"事件处理器 {display_handler_name} 已移除") - return True - - logger.warning(f"未找到事件处理器 {display_handler_name},无法移除") - return False - - def _transform_event_message( - self, message: MessageRecv, llm_prompt: Optional[str] = None, llm_response: Optional[Dict[str, Any]] = None - ) -> MaiMessages: - """转换事件消息格式""" - # 直接赋值部分内容 - transformed_message = MaiMessages( - llm_prompt=llm_prompt, - llm_response_content=llm_response.get("content") if llm_response else None, - llm_response_reasoning=llm_response.get("reasoning") if llm_response else None, - llm_response_model=llm_response.get("model") if llm_response else None, - llm_response_tool_call=llm_response.get("tool_calls") if llm_response else None, - raw_message=message.raw_message, - additional_data=message.message_info.additional_config or {}, - ) - - # 消息段处理 - if message.message_segment.type == "seglist": - transformed_message.message_segments = list(message.message_segment.data) # type: ignore - else: - transformed_message.message_segments = [message.message_segment] - - # stream_id 处理 - if hasattr(message, "chat_stream") and message.chat_stream: - transformed_message.stream_id = message.chat_stream.stream_id - - # 处理后文本 - transformed_message.plain_text = message.processed_plain_text - - # 基本信息 - if hasattr(message, "message_info") and message.message_info: - if message.message_info.platform: - transformed_message.message_base_info["platform"] = message.message_info.platform - if message.message_info.group_info: - transformed_message.is_group_message = True - transformed_message.message_base_info.update( - { - "group_id": message.message_info.group_info.group_id, - "group_name": message.message_info.group_info.group_name, - } - ) - if message.message_info.user_info: - if not transformed_message.is_group_message: - transformed_message.is_private_message = True - transformed_message.message_base_info.update( - { - "user_id": message.message_info.user_info.user_id, - "user_cardname": message.message_info.user_info.user_cardname, # 用户群昵称 - "user_nickname": message.message_info.user_info.user_nickname, # 用户昵称(用户名) - } - ) - - return transformed_message - - def _build_message_from_stream( - self, stream_id: str, llm_prompt: Optional[str] = None, llm_response: Optional[Dict[str, Any]] = None - ) -> MaiMessages: - """从流ID构建消息""" - chat_stream = get_chat_manager().get_stream(stream_id) - assert chat_stream, f"未找到流ID为 {stream_id} 的聊天流" - message = chat_stream.context.get_last_message() - return self._transform_event_message(message, llm_prompt, llm_response) - - def _transform_event_without_message( - self, - stream_id: str, - llm_prompt: Optional[str] = None, - llm_response: Optional[Dict[str, Any]] = None, - action_usage: Optional[List[str]] = None, - ) -> MaiMessages: - """没有message对象时进行转换""" - chat_stream = get_chat_manager().get_stream(stream_id) - assert chat_stream, f"未找到流ID为 {stream_id} 的聊天流" - return MaiMessages( - stream_id=stream_id, - llm_prompt=llm_prompt, - llm_response_content=(llm_response.get("content") if llm_response else None), - llm_response_reasoning=(llm_response.get("reasoning") if llm_response else None), - llm_response_model=llm_response.get("model") if llm_response else None, - llm_response_tool_call=(llm_response.get("tool_calls") if llm_response else None), - is_group_message=(not (not chat_stream.group_info)), - is_private_message=(not chat_stream.group_info), - action_usage=action_usage, - additional_data={"response_is_processed": True}, - ) - - def _task_done_callback(self, task: asyncio.Task[Tuple[bool, bool, str | None]]): - """任务完成回调""" - task_name = task.get_name() or "Unknown Task" - try: - success, _, result = task.result() # 忽略是否继续的标志,因为消息本身未被拦截 - if success: - logger.debug(f"事件处理任务 {task_name} 已成功完成: {result}") - else: - logger.error(f"事件处理任务 {task_name} 执行失败: {result}") - except asyncio.CancelledError: - pass - except Exception as e: - logger.error(f"事件处理任务 {task_name} 发生异常: {e}") - finally: - with contextlib.suppress(ValueError, KeyError): - self._handler_tasks[task_name].remove(task) - - async def cancel_handler_tasks(self, handler_name: str) -> None: - tasks_to_be_cancelled = self._handler_tasks.get(handler_name, []) - if remaining_tasks := [task for task in tasks_to_be_cancelled if not task.done()]: - for task in remaining_tasks: - task.cancel() - try: - await asyncio.wait_for(asyncio.gather(*remaining_tasks, return_exceptions=True), timeout=5) - logger.info(f"已取消事件处理器 {handler_name} 的所有任务") - except asyncio.TimeoutError: - logger.warning(f"取消事件处理器 {handler_name} 的任务超时,开始强制取消") - except Exception as e: - logger.error(f"取消事件处理器 {handler_name} 的任务时发生异常: {e}") - if handler_name in self._handler_tasks: - del self._handler_tasks[handler_name] - - async def unregister_event_subscriber(self, handler_name: str) -> bool: - """取消注册事件处理器""" - if handler_name not in self._handler_mapping: - logger.warning(f"事件处理器 {handler_name} 不存在,无法取消注册") - return False - - await self.cancel_handler_tasks(handler_name) - - handler_class = self._handler_mapping.pop(handler_name) - if not self._remove_event_handler_instance(handler_class): - return False - - logger.info(f"事件处理器 {handler_name} 已成功取消注册") - return True - - -events_manager = EventsManager()