fix(plugin): 为事件处理器添加异步锁和并行执行支持

在BaseEvent类中引入asyncio.Lock确保同一个事件不能同时激活多次
重构事件触发逻辑,使用asyncio.gather并行执行所有订阅者处理器
提高事件处理效率的同时保证线程安全
This commit is contained in:
Windpicker-owo
2025-08-28 17:18:01 +08:00
parent 671ee6cbcf
commit dc13343b88
2 changed files with 47 additions and 24 deletions

View File

@@ -1,3 +1,4 @@
import asyncio
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from src.common.logger import get_logger from src.common.logger import get_logger
@@ -73,6 +74,8 @@ class BaseEvent:
from src.plugin_system.base.base_events_handler import BaseEventHandler from src.plugin_system.base.base_events_handler import BaseEventHandler
self.subscribers: List["BaseEventHandler"] = [] # 订阅该事件的事件处理器列表 self.subscribers: List["BaseEventHandler"] = [] # 订阅该事件的事件处理器列表
self.event_handle_lock = asyncio.Lock()
def __name__(self): def __name__(self):
return self.name return self.name
@@ -88,22 +91,45 @@ class BaseEvent:
if not self.enabled: if not self.enabled:
return HandlerResultsCollection([]) return HandlerResultsCollection([])
# 按权重从高到低排序订阅者 # 使用锁确保同一个事件不能同时激活多次
# 使用直接属性访问,-1代表自动权重 async with self.event_handle_lock:
sorted_subscribers = sorted(self.subscribers, key=lambda h: h.weight if hasattr(h, 'weight') and h.weight != -1 else 0, reverse=True) # 按权重从高到低排序订阅者
# 使用直接属性访问,-1代表自动权重
sorted_subscribers = sorted(self.subscribers, key=lambda h: h.weight if hasattr(h, 'weight') and h.weight != -1 else 0, reverse=True)
results = [] # 并行执行所有订阅者
for subscriber in sorted_subscribers: tasks = []
try: for subscriber in sorted_subscribers:
result = await subscriber.execute(params) # 为每个订阅者创建执行任务
if not result.handler_name: task = self._execute_subscriber(subscriber, params)
# 补充handler_name tasks.append(task)
result.handler_name = subscriber.handler_name if hasattr(subscriber, 'handler_name') else subscriber.__class__.__name__
results.append(result) # 等待所有任务完成
except Exception as e: results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理执行异常
# 处理执行结果
processed_results = []
for i, result in enumerate(results):
subscriber = sorted_subscribers[i]
handler_name = subscriber.handler_name if hasattr(subscriber, 'handler_name') else subscriber.__class__.__name__ handler_name = subscriber.handler_name if hasattr(subscriber, 'handler_name') else subscriber.__class__.__name__
logger.error(f"事件处理器 {handler_name} 执行失败: {e}")
results.append(HandlerResult(False, True, str(e), handler_name))
return HandlerResultsCollection(results) if isinstance(result, Exception):
# 处理执行异常
logger.error(f"事件处理器 {handler_name} 执行失败: {result}")
processed_results.append(HandlerResult(False, True, str(result), handler_name))
else:
# 正常执行结果
if not result.handler_name:
# 补充handler_name
result.handler_name = handler_name
processed_results.append(result)
return HandlerResultsCollection(processed_results)
async def _execute_subscriber(self, subscriber, params: dict) -> HandlerResult:
"""执行单个订阅者处理器"""
try:
return await subscriber.execute(params)
except Exception as e:
# 异常会在 gather 中捕获,这里直接抛出让 gather 处理
raise e

View File

@@ -2,7 +2,6 @@
事件管理器 - 实现Event和EventHandler的单例管理 事件管理器 - 实现Event和EventHandler的单例管理
提供统一的事件注册、管理和触发接口 提供统一的事件注册、管理和触发接口
""" """
import asyncio
from typing import Dict, Type, List, Optional, Any, Union from typing import Dict, Type, List, Optional, Any, Union
from threading import Lock from threading import Lock
@@ -39,7 +38,6 @@ class EventManager:
self._event_handlers: Dict[str, Type[BaseEventHandler]] = {} self._event_handlers: Dict[str, Type[BaseEventHandler]] = {}
self._pending_subscriptions: Dict[str, List[str]] = {} # 缓存失败的订阅 self._pending_subscriptions: Dict[str, List[str]] = {} # 缓存失败的订阅
self._initialized = True self._initialized = True
self.event_handle_lock = asyncio.Lock()
logger.info("EventManager 单例初始化完成") logger.info("EventManager 单例初始化完成")
def register_event(self, event_name: Union[EventType, str]) -> bool: def register_event(self, event_name: Union[EventType, str]) -> bool:
@@ -282,8 +280,8 @@ class EventManager:
if event is None: if event is None:
logger.error(f"事件 {event_name} 不存在,无法触发") logger.error(f"事件 {event_name} 不存在,无法触发")
return None return None
async with self.event_handle_lock:
return await event.activate(params) return await event.activate(params)
def init_default_events(self) -> None: def init_default_events(self) -> None:
"""初始化默认事件""" """初始化默认事件"""
@@ -295,8 +293,7 @@ class EventManager:
EventType.POST_LLM, EventType.POST_LLM,
EventType.AFTER_LLM, EventType.AFTER_LLM,
EventType.POST_SEND, EventType.POST_SEND,
EventType.AFTER_SEND, EventType.AFTER_SEND
EventType.UNKNOWN
] ]
for event_name in default_events: for event_name in default_events: