From dc13343b8885a68b764535c97aa994520160127a Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Thu, 28 Aug 2025 17:18:01 +0800 Subject: [PATCH] =?UTF-8?q?fix(plugin):=20=E4=B8=BA=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=99=A8=E6=B7=BB=E5=8A=A0=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E9=94=81=E5=92=8C=E5=B9=B6=E8=A1=8C=E6=89=A7=E8=A1=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在BaseEvent类中引入asyncio.Lock确保同一个事件不能同时激活多次 重构事件触发逻辑,使用asyncio.gather并行执行所有订阅者处理器 提高事件处理效率的同时保证线程安全 --- src/plugin_system/base/base_event.py | 62 ++++++++++++++++++------- src/plugin_system/core/event_manager.py | 9 ++-- 2 files changed, 47 insertions(+), 24 deletions(-) diff --git a/src/plugin_system/base/base_event.py b/src/plugin_system/base/base_event.py index d010d33dc..9e85dc34d 100644 --- a/src/plugin_system/base/base_event.py +++ b/src/plugin_system/base/base_event.py @@ -1,3 +1,4 @@ +import asyncio from typing import List, Dict, Any, Optional from src.common.logger import get_logger @@ -73,6 +74,8 @@ class BaseEvent: from src.plugin_system.base.base_events_handler import BaseEventHandler self.subscribers: List["BaseEventHandler"] = [] # 订阅该事件的事件处理器列表 + self.event_handle_lock = asyncio.Lock() + def __name__(self): return self.name @@ -88,22 +91,45 @@ class BaseEvent: if not self.enabled: return HandlerResultsCollection([]) - # 按权重从高到低排序订阅者 - # 使用直接属性访问,-1代表自动权重 - sorted_subscribers = sorted(self.subscribers, key=lambda h: h.weight if hasattr(h, 'weight') and h.weight != -1 else 0, reverse=True) - - results = [] - for subscriber in sorted_subscribers: - try: - result = await subscriber.execute(params) - if not result.handler_name: - # 补充handler_name - result.handler_name = subscriber.handler_name if hasattr(subscriber, 'handler_name') else subscriber.__class__.__name__ - results.append(result) - except Exception as e: - # 处理执行异常 + # 使用锁确保同一个事件不能同时激活多次 + async with self.event_handle_lock: + # 按权重从高到低排序订阅者 + # 使用直接属性访问,-1代表自动权重 + sorted_subscribers = sorted(self.subscribers, key=lambda h: h.weight if hasattr(h, 'weight') and h.weight != -1 else 0, reverse=True) + + # 并行执行所有订阅者 + tasks = [] + for subscriber in sorted_subscribers: + # 为每个订阅者创建执行任务 + task = self._execute_subscriber(subscriber, params) + tasks.append(task) + + # 等待所有任务完成 + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 处理执行结果 + processed_results = [] + for i, result in enumerate(results): + subscriber = sorted_subscribers[i] handler_name = subscriber.handler_name if hasattr(subscriber, 'handler_name') else subscriber.__class__.__name__ - logger.error(f"事件处理器 {handler_name} 执行失败: {e}") - results.append(HandlerResult(False, True, str(e), handler_name)) - - return HandlerResultsCollection(results) \ No newline at end of file + + if isinstance(result, Exception): + # 处理执行异常 + logger.error(f"事件处理器 {handler_name} 执行失败: {result}") + processed_results.append(HandlerResult(False, True, str(result), handler_name)) + else: + # 正常执行结果 + if not result.handler_name: + # 补充handler_name + result.handler_name = handler_name + processed_results.append(result) + + return HandlerResultsCollection(processed_results) + + async def _execute_subscriber(self, subscriber, params: dict) -> HandlerResult: + """执行单个订阅者处理器""" + try: + return await subscriber.execute(params) + except Exception as e: + # 异常会在 gather 中捕获,这里直接抛出让 gather 处理 + raise e \ No newline at end of file diff --git a/src/plugin_system/core/event_manager.py b/src/plugin_system/core/event_manager.py index a9ffc0d62..b6845c3ef 100644 --- a/src/plugin_system/core/event_manager.py +++ b/src/plugin_system/core/event_manager.py @@ -2,7 +2,6 @@ 事件管理器 - 实现Event和EventHandler的单例管理 提供统一的事件注册、管理和触发接口 """ -import asyncio from typing import Dict, Type, List, Optional, Any, Union from threading import Lock @@ -39,7 +38,6 @@ class EventManager: self._event_handlers: Dict[str, Type[BaseEventHandler]] = {} self._pending_subscriptions: Dict[str, List[str]] = {} # 缓存失败的订阅 self._initialized = True - self.event_handle_lock = asyncio.Lock() logger.info("EventManager 单例初始化完成") def register_event(self, event_name: Union[EventType, str]) -> bool: @@ -282,8 +280,8 @@ class EventManager: if event is None: logger.error(f"事件 {event_name} 不存在,无法触发") return None - async with self.event_handle_lock: - return await event.activate(params) + + return await event.activate(params) def init_default_events(self) -> None: """初始化默认事件""" @@ -295,8 +293,7 @@ class EventManager: EventType.POST_LLM, EventType.AFTER_LLM, EventType.POST_SEND, - EventType.AFTER_SEND, - EventType.UNKNOWN + EventType.AFTER_SEND ] for event_name in default_events: