From faf13e39b66648a1da3eb8bf31bb4fe9ca15fcf6 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Mon, 25 Aug 2025 16:42:34 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=BF=81=E7=A7=BBWindpicker-owo?= =?UTF-8?q?=E7=9A=84Event=E7=B3=BB=E7=BB=9F=E9=87=8D=E6=9E=84\n\n-=20?= =?UTF-8?q?=E5=BC=95=E5=85=A5=E6=96=B0=E7=9A=84BaseEvent=E5=92=8CHandlerRe?= =?UTF-8?q?sult=E6=9E=B6=E6=9E=84\n-=20=E5=AE=9E=E7=8E=B0=E7=81=B5?= =?UTF-8?q?=E6=B4=BB=E7=9A=84=E4=BA=8B=E4=BB=B6=E8=AE=A2=E9=98=85=E4=B8=8E?= =?UTF-8?q?=E6=BF=80=E6=B4=BB=E6=9C=BA=E5=88=B6\n-=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E7=AE=A1=E7=90=86=E5=99=A8=E5=8D=95=E4=BE=8B?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F\n-=20=E6=94=AF=E6=8C=81=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E7=BC=93=E5=AD=98=E5=92=8C=E6=9D=83=E9=87=8D=E6=8E=92=E5=BA=8F?= =?UTF-8?q?\n-=20=E7=BB=9F=E4=B8=80=E4=BA=8B=E4=BB=B6=E5=A4=84=E7=90=86?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugin_system/base/base_event.py | 109 +++++ src/plugin_system/base/base_events_handler.py | 48 ++- src/plugin_system/core/event_manager.py | 382 ++++++++++++++++++ 3 files changed, 531 insertions(+), 8 deletions(-) create mode 100644 src/plugin_system/base/base_event.py create mode 100644 src/plugin_system/core/event_manager.py diff --git a/src/plugin_system/base/base_event.py b/src/plugin_system/base/base_event.py new file mode 100644 index 000000000..d010d33dc --- /dev/null +++ b/src/plugin_system/base/base_event.py @@ -0,0 +1,109 @@ +from typing import List, Dict, Any, Optional +from src.common.logger import get_logger + +logger = get_logger("base_event") + +class HandlerResult: + """事件处理器执行结果 + + 所有事件处理器必须返回此类的实例 + """ + def __init__(self, success: bool, continue_process: bool, message: str = "", handler_name: str = ""): + self.success = success + self.continue_process = continue_process + self.message = message + self.handler_name = handler_name + + def __repr__(self): + return f"HandlerResult(success={self.success}, continue_process={self.continue_process}, message='{self.message}', handler_name='{self.handler_name}')" + +class HandlerResultsCollection: + """HandlerResult集合,提供便捷的查询方法""" + + def __init__(self, results: List[HandlerResult]): + self.results = results + + def all_continue_process(self) -> bool: + """检查是否所有handler的continue_process都为True""" + return all(result.continue_process for result in self.results) + + def get_all_results(self) -> List[HandlerResult]: + """获取所有HandlerResult""" + return self.results + + def get_failed_handlers(self) -> List[HandlerResult]: + """获取执行失败的handler结果""" + return [result for result in self.results if not result.success] + + def get_stopped_handlers(self) -> List[HandlerResult]: + """获取continue_process为False的handler结果""" + return [result for result in self.results if not result.continue_process] + + def get_handler_result(self, handler_name: str) -> Optional[HandlerResult]: + """获取指定handler的结果""" + for result in self.results: + if result.handler_name == handler_name: + return result + return None + + def get_success_count(self) -> int: + """获取成功执行的handler数量""" + return sum(1 for result in self.results if result.success) + + def get_failure_count(self) -> int: + """获取执行失败的handler数量""" + return sum(1 for result in self.results if not result.success) + + def get_summary(self) -> Dict[str, Any]: + """获取执行摘要""" + return { + "total_handlers": len(self.results), + "success_count": self.get_success_count(), + "failure_count": self.get_failure_count(), + "continue_process": self.all_continue_process(), + "failed_handlers": [r.handler_name for r in self.get_failed_handlers()], + "stopped_handlers": [r.handler_name for r in self.get_stopped_handlers()] + } + +class BaseEvent: + def __init__(self, name: str): + self.name = name + self.enabled = True + + from src.plugin_system.base.base_events_handler import BaseEventHandler + self.subscribers: List["BaseEventHandler"] = [] # 订阅该事件的事件处理器列表 + + def __name__(self): + return self.name + + async def activate(self, params: dict) -> HandlerResultsCollection: + """激活事件,执行所有订阅的处理器 + + Args: + params: 传递给处理器的参数 + + Returns: + HandlerResultsCollection: 所有处理器的执行结果集合 + """ + if not self.enabled: + return HandlerResultsCollection([]) + + # 按权重从高到低排序订阅者 + # 使用直接属性访问,-1代表自动权重 + sorted_subscribers = sorted(self.subscribers, key=lambda h: h.weight if hasattr(h, 'weight') and h.weight != -1 else 0, reverse=True) + + results = [] + for subscriber in sorted_subscribers: + try: + result = await subscriber.execute(params) + if not result.handler_name: + # 补充handler_name + result.handler_name = subscriber.handler_name if hasattr(subscriber, 'handler_name') else subscriber.__class__.__name__ + results.append(result) + except Exception as e: + # 处理执行异常 + handler_name = subscriber.handler_name if hasattr(subscriber, 'handler_name') else subscriber.__class__.__name__ + logger.error(f"事件处理器 {handler_name} 执行失败: {e}") + results.append(HandlerResult(False, True, str(e), handler_name)) + + return HandlerResultsCollection(results) \ No newline at end of file diff --git a/src/plugin_system/base/base_events_handler.py b/src/plugin_system/base/base_events_handler.py index 5118885ff..bfc0e5636 100644 --- a/src/plugin_system/base/base_events_handler.py +++ b/src/plugin_system/base/base_events_handler.py @@ -1,8 +1,8 @@ from abc import ABC, abstractmethod -from typing import Tuple, Optional, Dict +from typing import Tuple, Optional, Dict, List, Union from src.common.logger import get_logger -from .component_types import MaiMessages, EventType, EventHandlerInfo, ComponentType +from .component_types import EventType, EventHandlerInfo, ComponentType logger = get_logger("base_event_handler") @@ -13,8 +13,6 @@ class BaseEventHandler(ABC): 所有事件处理器都应该继承这个基类,提供事件处理的基本接口 """ - event_type: EventType = EventType.UNKNOWN - """事件类型,默认为未知""" handler_name: str = "" """处理器名称""" handler_description: str = "" @@ -23,6 +21,8 @@ class BaseEventHandler(ABC): """处理器权重,越大权重越高""" intercept_message: bool = False """是否拦截消息,默认为否""" + init_subscribe: List[Union[EventType, str]] = [EventType.UNKNOWN] + """初始化时订阅的事件名称""" def __init__(self): self.log_prefix = "[EventHandler]" @@ -30,18 +30,51 @@ class BaseEventHandler(ABC): """对应插件名""" self.plugin_config: Optional[Dict] = None """插件配置字典""" - if self.event_type == EventType.UNKNOWN: + self.subscribed_events = [] + """订阅的事件列表""" + if EventType.UNKNOWN in self.init_subscribe: raise NotImplementedError("事件处理器必须指定 event_type") @abstractmethod - async def execute(self, message: MaiMessages) -> Tuple[bool, bool, Optional[str]]: + async def execute(self, kwargs: dict | None) -> Tuple[bool, bool, Optional[str]]: """执行事件处理的抽象方法,子类必须实现 - + Args: + kwargs (dict | None): 事件消息对象,当你注册的事件为ON_START和ON_STOP时message为None Returns: Tuple[bool, bool, Optional[str]]: (是否执行成功, 是否需要继续处理, 可选的返回消息) """ raise NotImplementedError("子类必须实现 execute 方法") + def subscribe(self, event_name: str) -> None: + """订阅一个事件 + + Args: + event_name (str): 要订阅的事件名称 + """ + from src.plugin_system.core.event_manager import event_manager + + if not event_manager.subscribe_handler_to_event(self.handler_name, event_name): + logger.error(f"事件处理器 {self.handler_name} 订阅事件 {event_name} 失败") + return + + logger.debug(f"{self.log_prefix} 订阅事件 {event_name}") + self.subscribed_events.append(event_name) + + def unsubscribe(self, event_name: str) -> None: + """取消订阅一个事件 + + Args: + event_name (str): 要取消订阅的事件名称 + """ + from src.plugin_system.core.event_manager import event_manager + + if event_manager.unsubscribe_handler_from_event(self.handler_name, event_name): + logger.debug(f"{self.log_prefix} 取消订阅事件 {event_name}") + if event_name in self.subscribed_events: + self.subscribed_events.remove(event_name) + else: + logger.warning(f"{self.log_prefix} 未订阅事件 {event_name},无法取消订阅") + @classmethod def get_handler_info(cls) -> "EventHandlerInfo": """获取事件处理器的信息""" @@ -54,7 +87,6 @@ class BaseEventHandler(ABC): name=name, component_type=ComponentType.EVENT_HANDLER, description=getattr(cls, "handler_description", "events处理器"), - event_type=cls.event_type, weight=cls.weight, intercept_message=cls.intercept_message, ) diff --git a/src/plugin_system/core/event_manager.py b/src/plugin_system/core/event_manager.py new file mode 100644 index 000000000..cf5e4614f --- /dev/null +++ b/src/plugin_system/core/event_manager.py @@ -0,0 +1,382 @@ +""" +事件管理器 - 实现Event和EventHandler的单例管理 +提供统一的事件注册、管理和触发接口 +""" + +from typing import Dict, Type, List, Optional, Any, Union +from threading import Lock + +from src.common.logger import get_logger +from src.plugin_system.base.base_event import BaseEvent, HandlerResultsCollection +from src.plugin_system.base.base_events_handler import BaseEventHandler +from src.plugin_system.base.component_types import EventType +logger = get_logger("event_manager") + + +class EventManager: + """事件管理器单例类 + + 负责管理所有事件和事件处理器的注册、订阅、触发等操作 + 使用单例模式确保全局只有一个事件管理实例 + """ + + _instance: Optional['EventManager'] = None + _lock = Lock() + + def __new__(cls) -> 'EventManager': + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self) -> None: + if self._initialized: + return + + self._events: Dict[str, BaseEvent] = {} + self._event_handlers: Dict[str, Type[BaseEventHandler]] = {} + self._pending_subscriptions: Dict[str, List[str]] = {} # 缓存失败的订阅 + self._initialized = True + logger.info("EventManager 单例初始化完成") + + def register_event(self, event_name: Union[EventType, str]) -> bool: + """注册一个新的事件 + + Args: + event_name Union[EventType, str]: 事件名称 + + Returns: + bool: 注册成功返回True,已存在返回False + """ + if event_name in self._events: + logger.warning(f"事件 {event_name} 已存在,跳过注册") + return False + + event = BaseEvent(event_name) + self._events[event_name] = event + logger.info(f"事件 {event_name} 注册成功") + + # 检查是否有缓存的订阅需要处理 + self._process_pending_subscriptions(event_name) + + return True + + def get_event(self, event_name: Union[EventType, str]) -> Optional[BaseEvent]: + """获取指定事件实例 + + Args: + event_name Union[EventType, str]: 事件名称 + + Returns: + BaseEvent: 事件实例,不存在返回None + """ + return self._events.get(event_name) + + def get_all_events(self) -> Dict[str, BaseEvent]: + """获取所有已注册的事件 + + Returns: + Dict[str, BaseEvent]: 所有事件的字典 + """ + return self._events.copy() + + def get_enabled_events(self) -> Dict[str, BaseEvent]: + """获取所有已启用的事件 + + Returns: + Dict[str, BaseEvent]: 已启用事件的字典 + """ + return {name: event for name, event in self._events.items() if event.enabled} + + def get_disabled_events(self) -> Dict[str, BaseEvent]: + """获取所有已禁用的事件 + + Returns: + Dict[str, BaseEvent]: 已禁用事件的字典 + """ + return {name: event for name, event in self._events.items() if not event.enabled} + + def enable_event(self, event_name: Union[EventType, str]) -> bool: + """启用指定事件 + + Args: + event_name Union[EventType, str]: 事件名称 + + Returns: + bool: 成功返回True,事件不存在返回False + """ + event = self.get_event(event_name) + if event is None: + logger.error(f"事件 {event_name} 不存在,无法启用") + return False + + event.enabled = True + logger.info(f"事件 {event_name} 已启用") + return True + + def disable_event(self, event_name: Union[EventType, str]) -> bool: + """禁用指定事件 + + Args: + event_name Union[EventType, str]: 事件名称 + + Returns: + bool: 成功返回True,事件不存在返回False + """ + event = self.get_event(event_name) + if event is None: + logger.error(f"事件 {event_name} 不存在,无法禁用") + return False + + event.enabled = False + logger.info(f"事件 {event_name} 已禁用") + return True + + def register_event_handler(self, handler_class: Type[BaseEventHandler]) -> bool: + """注册事件处理器 + + Args: + handler_class (Type[BaseEventHandler]): 事件处理器类 + + Returns: + bool: 注册成功返回True,已存在返回False + """ + handler_name = handler_class.handler_name or handler_class.__name__.lower().replace("handler", "") + + if EventType.UNKNOWN in handler_class.init_subscribe: + logger.error(f"事件处理器 {handler_name} 不能订阅 UNKNOWN 事件") + return False + if handler_name in self._event_handlers: + logger.warning(f"事件处理器 {handler_name} 已存在,跳过注册") + return False + + self._event_handlers[handler_name] = handler_class() + + # 处理init_subscribe,缓存失败的订阅 + if self._event_handlers[handler_name].init_subscribe: + failed_subscriptions = [] + for event_name in self._event_handlers[handler_name].init_subscribe: + if not self.subscribe_handler_to_event(handler_name, event_name): + failed_subscriptions.append(event_name) + + # 缓存失败的订阅 + if failed_subscriptions: + self._pending_subscriptions[handler_name] = failed_subscriptions + logger.warning(f"事件处理器 {handler_name} 的部分订阅失败,已缓存: {failed_subscriptions}") + + logger.info(f"事件处理器 {handler_name} 注册成功") + return True + + def get_event_handler(self, handler_name: str) -> Optional[Type[BaseEventHandler]]: + """获取指定事件处理器类 + + Args: + handler_name (str): 处理器名称 + + Returns: + Type[BaseEventHandler]: 处理器类,不存在返回None + """ + return self._event_handlers.get(handler_name) + + def get_all_event_handlers(self) -> Dict[str, BaseEventHandler]: + """获取所有已注册的事件处理器 + + Returns: + Dict[str, Type[BaseEventHandler]]: 所有处理器的字典 + """ + return self._event_handlers.copy() + + def subscribe_handler_to_event(self, handler_name: str, event_name: Union[EventType, str]) -> bool: + """订阅事件处理器到指定事件 + + Args: + handler_name (str): 处理器名称 + event_name Union[EventType, str]: 事件名称 + + Returns: + bool: 订阅成功返回True + """ + handler_instance = self.get_event_handler(handler_name) + if handler_instance is None: + logger.error(f"事件处理器 {handler_name} 不存在,无法订阅到事件 {event_name}") + return False + + event = self.get_event(event_name) + if event is None: + logger.error(f"事件 {event_name} 不存在,无法订阅事件处理器 {handler_name}") + return False + + if handler_instance in event.subscribers: + logger.warning(f"事件处理器 {handler_name} 已经订阅了事件 {event_name},跳过重复订阅") + return True + + event.subscribers.append(handler_instance) + + # 按权重从高到低排序订阅者 + event.subscribers.sort(key=lambda h: getattr(h, 'weight', 0), reverse=True) + + logger.info(f"事件处理器 {handler_name} 成功订阅到事件 {event_name},当前权重排序完成") + return True + + def unsubscribe_handler_from_event(self, handler_name: str, event_name: Union[EventType, str]) -> bool: + """从指定事件取消订阅事件处理器 + + Args: + handler_name (str): 处理器名称 + event_name Union[EventType, str]: 事件名称 + + Returns: + bool: 取消订阅成功返回True + """ + event = self.get_event(event_name) + if event is None: + logger.error(f"事件 {event_name} 不存在,无法取消订阅") + return False + + # 查找并移除处理器实例 + removed = False + for subscriber in event.subscribers[:]: + if hasattr(subscriber, 'handler_name') and subscriber.handler_name == handler_name: + event.subscribers.remove(subscriber) + removed = True + break + + if removed: + logger.info(f"事件处理器 {handler_name} 成功从事件 {event_name} 取消订阅") + else: + logger.warning(f"事件处理器 {handler_name} 未订阅事件 {event_name}") + + return removed + + def get_event_subscribers(self, event_name: Union[EventType, str]) -> Dict[str, BaseEventHandler]: + """获取订阅指定事件的所有事件处理器 + + Args: + event_name Union[EventType, str]: 事件名称 + + Returns: + Dict[str, BaseEventHandler]: 处理器字典,键为处理器名称,值为处理器实例 + """ + event = self.get_event(event_name) + if event is None: + return {} + + return {handler.handler_name: handler for handler in event.subscribers} + + async def trigger_event(self, event_name: Union[EventType, str], **kwargs) -> Optional[HandlerResultsCollection]: + """触发指定事件 + + Args: + event_name Union[EventType, str]: 事件名称 + **kwargs: 传递给处理器的参数 + + Returns: + HandlerResultsCollection: 所有处理器的执行结果,事件不存在返回None + """ + params = kwargs or {} + + event = self.get_event(event_name) + if event is None: + logger.error(f"事件 {event_name} 不存在,无法触发") + return None + + return await event.activate(params) + + def init_default_events(self) -> None: + """初始化默认事件""" + default_events = [ + EventType.ON_START, + EventType.ON_STOP, + EventType.ON_PLAN, + EventType.ON_MESSAGE, + EventType.POST_LLM, + EventType.AFTER_LLM, + EventType.POST_SEND, + EventType.AFTER_SEND, + EventType.UNKNOWN + ] + + for event_name in default_events: + self.register_event(event_name) + + logger.info("默认事件初始化完成") + + def clear_all_events(self) -> None: + """清除所有事件和处理器(主要用于测试)""" + self._events.clear() + self._event_handlers.clear() + logger.info("所有事件和处理器已清除") + + def get_event_summary(self) -> Dict[str, Any]: + """获取事件系统摘要 + + Returns: + Dict[str, Any]: 包含事件系统统计信息的字典 + """ + enabled_events = self.get_enabled_events() + disabled_events = self.get_disabled_events() + + return { + "total_events": len(self._events), + "enabled_events": len(enabled_events), + "disabled_events": len(disabled_events), + "total_handlers": len(self._event_handlers), + "event_names": list(self._events.keys()), + "handler_names": list(self._event_handlers.keys()), + "pending_subscriptions": len(self._pending_subscriptions) + } + + def _process_pending_subscriptions(self, event_name: Union[EventType, str]) -> None: + """处理指定事件的缓存订阅 + + Args: + event_name Union[EventType, str]: 事件名称 + """ + handlers_to_remove = [] + + for handler_name, pending_events in self._pending_subscriptions.items(): + if event_name in pending_events: + if self.subscribe_handler_to_event(handler_name, event_name): + pending_events.remove(event_name) + logger.info(f"成功处理缓存订阅: {handler_name} -> {event_name}") + + # 如果该处理器没有更多待处理订阅,标记为移除 + if not pending_events: + handlers_to_remove.append(handler_name) + + # 清理已完成的处理器缓存 + for handler_name in handlers_to_remove: + del self._pending_subscriptions[handler_name] + + def process_all_pending_subscriptions(self) -> int: + """处理所有缓存的订阅 + + Returns: + int: 成功处理的订阅数量 + """ + processed_count = 0 + + # 复制待处理订阅,避免在迭代时修改字典 + pending_copy = dict(self._pending_subscriptions) + + for handler_name, pending_events in pending_copy.items(): + for event_name in pending_events[:]: # 使用切片避免修改列表 + if self.subscribe_handler_to_event(handler_name, event_name): + pending_events.remove(event_name) + processed_count += 1 + + # 清理已完成的处理器缓存 + handlers_to_remove = [name for name, events in self._pending_subscriptions.items() if not events] + for handler_name in handlers_to_remove: + del self._pending_subscriptions[handler_name] + + if processed_count > 0: + logger.info(f"批量处理缓存订阅完成,共处理 {processed_count} 个订阅") + + return processed_count + + +# 创建全局事件管理器实例 +event_manager = EventManager() \ No newline at end of file