refactor(plugin_system): 统一事件和处理器相关方法的类型注解

将事件管理器中多个方法的类型注解从 `type[BaseEventHandler]` 调整为 `BaseEventHandler`,以反映其处理的是处理器实例而非类本身。同时,优化了事件名称的处理逻辑,确保在整个系统中对 `EventType` 枚举和字符串类型名称的处理保持一致性。

- 将 `_event_handlers` 的类型注解从 `dict[str, type[BaseEventHandler]]` 修改为 `dict[str, BaseEventHandler]`
- 相应地更新了 `get_event_handler` 和 `get_all_event_handlers` 方法的返回类型注解
- 在多个方法内部增加了对 `EventType` 枚举的处理,使其能够接受枚举成员作为参数,提高了类型安全性和代码可读性
This commit is contained in:
minecraft1024a
2025-11-01 21:06:47 +08:00
parent 08a9a2c2e8
commit 64efe2e690

View File

@@ -37,7 +37,7 @@ class EventManager:
return
self._events: dict[str, BaseEvent] = {}
self._event_handlers: dict[str, type[BaseEventHandler]] = {}
self._event_handlers: dict[str, BaseEventHandler] = {}
self._pending_subscriptions: dict[str, list[str]] = {} # 缓存失败的订阅
self._scheduler_callback: Any | None = None # scheduler 回调函数
self._initialized = True
@@ -62,16 +62,17 @@ class EventManager:
allowed_triggers = []
if allowed_subscribers is None:
allowed_subscribers = []
if event_name in self._events:
logger.warning(f"事件 {event_name} 已存在,跳过注册")
_event_name = event_name.value if isinstance(event_name, EventType) else event_name
if _event_name in self._events:
logger.warning(f"事件 {_event_name} 已存在,跳过注册")
return False
event = BaseEvent(event_name, allowed_subscribers, allowed_triggers)
self._events[event_name] = event
logger.debug(f"事件 {event_name} 注册成功")
event = BaseEvent(_event_name, allowed_subscribers, allowed_triggers)
self._events[_event_name] = event
logger.debug(f"事件 {_event_name} 注册成功")
# 检查是否有缓存的订阅需要处理
self._process_pending_subscriptions(event_name)
self._process_pending_subscriptions(_event_name)
return True
@@ -84,7 +85,8 @@ class EventManager:
Returns:
BaseEvent: 事件实例不存在返回None
"""
return self._events.get(event_name)
_event_name = event_name.value if isinstance(event_name, EventType) else event_name
return self._events.get(_event_name)
def get_all_events(self) -> dict[str, BaseEvent]:
"""获取所有已注册的事件
@@ -175,10 +177,11 @@ class EventManager:
# 处理init_subscribe缓存失败的订阅
if self._event_handlers[handler_name].init_subscribe:
failed_subscriptions = [
event_name for event_name in self._event_handlers[handler_name].init_subscribe
if not self.subscribe_handler_to_event(handler_name, event_name)
]
failed_subscriptions: list[str] = []
for event_name in self._event_handlers[handler_name].init_subscribe:
if not self.subscribe_handler_to_event(handler_name, event_name):
_event_name = event_name.value if isinstance(event_name, EventType) else event_name
failed_subscriptions.append(_event_name)
# 缓存失败的订阅
if failed_subscriptions:
@@ -188,22 +191,22 @@ class EventManager:
logger.info(f"事件处理器 {handler_name} 注册成功")
return True
def get_event_handler(self, handler_name: str) -> type[BaseEventHandler] | None:
def get_event_handler(self, handler_name: str) -> BaseEventHandler | None:
"""获取指定事件处理器实例
Args:
handler_name (str): 处理器名称
Returns:
Type[BaseEventHandler]: 处理器实例不存在返回None
BaseEventHandler: 处理器实例不存在返回None
"""
return self._event_handlers.get(handler_name)
def get_all_event_handlers(self) -> dict[str, type[BaseEventHandler]]:
def get_all_event_handlers(self) -> dict[str, BaseEventHandler]:
"""获取所有已注册的事件处理器
Returns:
Dict[str, Type[BaseEventHandler]]: 所有处理器的字典
Dict[str, BaseEventHandler]: 所有处理器的字典
"""
return self._event_handlers.copy()
@@ -383,7 +386,7 @@ class EventManager:
"pending_subscriptions": len(self._pending_subscriptions),
}
def _process_pending_subscriptions(self, event_name: EventType | str) -> None:
def _process_pending_subscriptions(self, event_name: str) -> None:
"""处理指定事件的缓存订阅
Args: