Merge branch 'master' of https://github.com/MoFox-Studio/MoFox_Bot
This commit is contained in:
@@ -100,7 +100,7 @@ class CycleProcessor:
|
||||
from src.plugin_system.core.event_manager import event_manager
|
||||
from src.plugin_system.base.component_types import EventType
|
||||
# 触发 ON_PLAN 事件
|
||||
result = await event_manager.trigger_event(EventType.ON_PLAN, stream_id=self.context.stream_id)
|
||||
result = await event_manager.trigger_event(EventType.ON_PLAN, plugin_name="SYSTEM", stream_id=self.context.stream_id)
|
||||
if result and not result.all_continue_process():
|
||||
return
|
||||
|
||||
|
||||
@@ -437,7 +437,7 @@ class ChatBot:
|
||||
logger.info(f"命令处理完成,跳过后续消息处理: {cmd_result}")
|
||||
return
|
||||
|
||||
result = await event_manager.trigger_event(EventType.ON_MESSAGE,message=message)
|
||||
result = await event_manager.trigger_event(EventType.ON_MESSAGE,plugin_name="SYSTEM",message=message)
|
||||
if not result.all_continue_process():
|
||||
raise UserWarning(f"插件{result.get_summary().get('stopped_handlers','')}于消息到达时取消了消息处理")
|
||||
|
||||
|
||||
@@ -370,7 +370,7 @@ class DefaultReplyer:
|
||||
from src.plugin_system.core.event_manager import event_manager
|
||||
|
||||
if not from_plugin:
|
||||
result = await event_manager.trigger_event(EventType.POST_LLM,prompt=prompt,stream_id=stream_id)
|
||||
result = await event_manager.trigger_event(EventType.POST_LLM,plugin_name="SYSTEM",prompt=prompt,stream_id=stream_id)
|
||||
if not result.all_continue_process():
|
||||
raise UserWarning(f"插件{result.get_summary().get('stopped_handlers', '')}于请求前中断了内容生成")
|
||||
|
||||
@@ -390,7 +390,7 @@ class DefaultReplyer:
|
||||
}
|
||||
# 触发 AFTER_LLM 事件
|
||||
if not from_plugin:
|
||||
result = await event_manager.trigger_event(EventType.AFTER_LLM,prompt=prompt,llm_response=llm_response,stream_id=stream_id)
|
||||
result = await event_manager.trigger_event(EventType.AFTER_LLM,plugin_name="SYSTEM",prompt=prompt,llm_response=llm_response,stream_id=stream_id)
|
||||
if not result.all_continue_process():
|
||||
raise UserWarning(f"插件{result.get_summary().get('stopped_handlers','')}于请求后取消了内容生成")
|
||||
except UserWarning as e:
|
||||
|
||||
@@ -254,7 +254,7 @@ MoFox_Bot(第三方修改版)
|
||||
|
||||
|
||||
try:
|
||||
await event_manager.trigger_event(EventType.ON_START)
|
||||
await event_manager.trigger_event(EventType.ON_START,plugin_name="SYSTEM")
|
||||
init_time = int(1000 * (time.time() - init_start_time))
|
||||
logger.info(f"初始化完成,神经元放电{init_time}次")
|
||||
except Exception as e:
|
||||
|
||||
@@ -1,19 +1,20 @@
|
||||
import asyncio
|
||||
from typing import List, Dict, Any, Optional
|
||||
from src.common.logger import get_logger
|
||||
|
||||
logger = get_logger("base_event")
|
||||
|
||||
|
||||
class HandlerResult:
|
||||
"""事件处理器执行结果
|
||||
|
||||
所有事件处理器必须返回此类的实例
|
||||
"""
|
||||
def __init__(self, success: bool, continue_process: bool, message: str = "", handler_name: str = ""):
|
||||
def __init__(self, success: bool, continue_process: bool, message: Any = {}, handler_name: str = ""):
|
||||
self.success = success
|
||||
self.continue_process = continue_process
|
||||
self.message = message
|
||||
self.handler_name = handler_name
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
return f"HandlerResult(success={self.success}, continue_process={self.continue_process}, message='{self.message}', handler_name='{self.handler_name}')"
|
||||
|
||||
@@ -66,13 +67,22 @@ class HandlerResultsCollection:
|
||||
}
|
||||
|
||||
class BaseEvent:
|
||||
def __init__(self, name: str):
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
allowed_subscribers: List[str]=[],
|
||||
allowed_triggers: List[str]=[]
|
||||
):
|
||||
self.name = name
|
||||
self.enabled = True
|
||||
self.allowed_subscribers = allowed_subscribers # 记录事件处理器名
|
||||
self.allowed_triggers = allowed_triggers # 记录插件名
|
||||
|
||||
from src.plugin_system.base.base_events_handler import BaseEventHandler
|
||||
self.subscribers: List["BaseEventHandler"] = [] # 订阅该事件的事件处理器列表
|
||||
|
||||
self.event_handle_lock = asyncio.Lock()
|
||||
|
||||
def __name__(self):
|
||||
return self.name
|
||||
|
||||
@@ -88,22 +98,45 @@ class BaseEvent:
|
||||
if not self.enabled:
|
||||
return HandlerResultsCollection([])
|
||||
|
||||
# 按权重从高到低排序订阅者
|
||||
# 使用直接属性访问,-1代表自动权重
|
||||
sorted_subscribers = sorted(self.subscribers, key=lambda h: h.weight if hasattr(h, 'weight') and h.weight != -1 else 0, reverse=True)
|
||||
|
||||
results = []
|
||||
for subscriber in sorted_subscribers:
|
||||
try:
|
||||
result = await subscriber.execute(params)
|
||||
if not result.handler_name:
|
||||
# 补充handler_name
|
||||
result.handler_name = subscriber.handler_name if hasattr(subscriber, 'handler_name') else subscriber.__class__.__name__
|
||||
results.append(result)
|
||||
except Exception as e:
|
||||
# 处理执行异常
|
||||
# 使用锁确保同一个事件不能同时激活多次
|
||||
async with self.event_handle_lock:
|
||||
# 按权重从高到低排序订阅者
|
||||
# 使用直接属性访问,-1代表自动权重
|
||||
sorted_subscribers = sorted(self.subscribers, key=lambda h: h.weight if hasattr(h, 'weight') and h.weight != -1 else 0, reverse=True)
|
||||
|
||||
# 并行执行所有订阅者
|
||||
tasks = []
|
||||
for subscriber in sorted_subscribers:
|
||||
# 为每个订阅者创建执行任务
|
||||
task = self._execute_subscriber(subscriber, params)
|
||||
tasks.append(task)
|
||||
|
||||
# 等待所有任务完成
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# 处理执行结果
|
||||
processed_results = []
|
||||
for i, result in enumerate(results):
|
||||
subscriber = sorted_subscribers[i]
|
||||
handler_name = subscriber.handler_name if hasattr(subscriber, 'handler_name') else subscriber.__class__.__name__
|
||||
logger.error(f"事件处理器 {handler_name} 执行失败: {e}")
|
||||
results.append(HandlerResult(False, True, str(e), handler_name))
|
||||
|
||||
return HandlerResultsCollection(results)
|
||||
|
||||
if isinstance(result, Exception):
|
||||
# 处理执行异常
|
||||
logger.error(f"事件处理器 {handler_name} 执行失败: {result}")
|
||||
processed_results.append(HandlerResult(False, True, str(result), handler_name))
|
||||
else:
|
||||
# 正常执行结果
|
||||
if not result.handler_name:
|
||||
# 补充handler_name
|
||||
result.handler_name = handler_name
|
||||
processed_results.append(result)
|
||||
|
||||
return HandlerResultsCollection(processed_results)
|
||||
|
||||
async def _execute_subscriber(self, subscriber, params: dict) -> HandlerResult:
|
||||
"""执行单个订阅者处理器"""
|
||||
try:
|
||||
return await subscriber.execute(params)
|
||||
except Exception as e:
|
||||
# 异常会在 gather 中捕获,这里直接抛出让 gather 处理
|
||||
raise e
|
||||
@@ -2,7 +2,6 @@
|
||||
事件管理器 - 实现Event和EventHandler的单例管理
|
||||
提供统一的事件注册、管理和触发接口
|
||||
"""
|
||||
|
||||
from typing import Dict, Type, List, Optional, Any, Union
|
||||
from threading import Lock
|
||||
|
||||
@@ -41,12 +40,18 @@ class EventManager:
|
||||
self._initialized = True
|
||||
logger.info("EventManager 单例初始化完成")
|
||||
|
||||
def register_event(self, event_name: Union[EventType, str]) -> bool:
|
||||
def register_event(
|
||||
self,
|
||||
event_name: Union[EventType, str],
|
||||
allowed_subscribers: List[str]=[],
|
||||
allowed_triggers: List[str]=[]
|
||||
) -> bool:
|
||||
"""注册一个新的事件
|
||||
|
||||
Args:
|
||||
event_name Union[EventType, str]: 事件名称
|
||||
|
||||
allowed_subscribers: List[str]: 事件订阅者白名单,
|
||||
allowed_triggers: List[str]: 事件触发插件白名单
|
||||
Returns:
|
||||
bool: 注册成功返回True,已存在返回False
|
||||
"""
|
||||
@@ -54,7 +59,7 @@ class EventManager:
|
||||
logger.warning(f"事件 {event_name} 已存在,跳过注册")
|
||||
return False
|
||||
|
||||
event = BaseEvent(event_name)
|
||||
event = BaseEvent(event_name,allowed_subscribers,allowed_triggers)
|
||||
self._events[event_name] = event
|
||||
logger.info(f"事件 {event_name} 注册成功")
|
||||
|
||||
@@ -211,7 +216,12 @@ class EventManager:
|
||||
if handler_instance in event.subscribers:
|
||||
logger.warning(f"事件处理器 {handler_name} 已经订阅了事件 {event_name},跳过重复订阅")
|
||||
return True
|
||||
|
||||
|
||||
# 白名单检查
|
||||
if event.allowed_subscribers and handler_name not in event.allowed_subscribers:
|
||||
logger.warning(f"事件处理器 {handler_name} 不在事件 {event_name} 的订阅者白名单中,无法订阅")
|
||||
return False
|
||||
|
||||
event.subscribers.append(handler_instance)
|
||||
|
||||
# 按权重从高到低排序订阅者
|
||||
@@ -265,11 +275,12 @@ class EventManager:
|
||||
|
||||
return {handler.handler_name: handler for handler in event.subscribers}
|
||||
|
||||
async def trigger_event(self, event_name: Union[EventType, str], **kwargs) -> Optional[HandlerResultsCollection]:
|
||||
async def trigger_event(self, event_name: Union[EventType, str], plugin_name: Optional[str]="", **kwargs) -> Optional[HandlerResultsCollection]:
|
||||
"""触发指定事件
|
||||
|
||||
Args:
|
||||
event_name Union[EventType, str]: 事件名称
|
||||
plugin_name str: 触发事件的插件名
|
||||
**kwargs: 传递给处理器的参数
|
||||
|
||||
Returns:
|
||||
@@ -281,7 +292,15 @@ class EventManager:
|
||||
if event is None:
|
||||
logger.error(f"事件 {event_name} 不存在,无法触发")
|
||||
return None
|
||||
|
||||
|
||||
# 插件白名单检查
|
||||
if event.allowed_triggers and not plugin_name:
|
||||
logger.warning(f"事件 {event_name} 存在触发者白名单,缺少plugin_name无法验证权限,已拒绝触发!")
|
||||
return None
|
||||
elif event.allowed_triggers and plugin_name not in event.allowed_triggers:
|
||||
logger.warning(f"插件 {plugin_name} 没有权限触发事件 {event_name},已拒绝触发!")
|
||||
return None
|
||||
|
||||
return await event.activate(params)
|
||||
|
||||
def init_default_events(self) -> None:
|
||||
@@ -294,12 +313,11 @@ class EventManager:
|
||||
EventType.POST_LLM,
|
||||
EventType.AFTER_LLM,
|
||||
EventType.POST_SEND,
|
||||
EventType.AFTER_SEND,
|
||||
EventType.UNKNOWN
|
||||
EventType.AFTER_SEND
|
||||
]
|
||||
|
||||
for event_name in default_events:
|
||||
self.register_event(event_name)
|
||||
self.register_event(event_name,allowed_triggers=["SYSTEM"])
|
||||
|
||||
logger.info("默认事件初始化完成")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user