From b697af4d3a6e293e9d24a2502fd78a71dd361541 Mon Sep 17 00:00:00 2001 From: Windpicker-owo Date: Mon, 15 Sep 2025 11:50:34 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0event=20doc?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/plugins/event-system-guide.md | 350 +++++++++++++++++++++-------- 1 file changed, 252 insertions(+), 98 deletions(-) diff --git a/docs/plugins/event-system-guide.md b/docs/plugins/event-system-guide.md index 2ef93aa23..f2aa58f71 100644 --- a/docs/plugins/event-system-guide.md +++ b/docs/plugins/event-system-guide.md @@ -1,48 +1,56 @@ + # 事件系统使用指南 ## 概述 -本项目的事件系统是一个基于插件架构的异步事件处理框架,允许插件通过事件驱动的方式进行通信和协作。事件系统采用发布-订阅模式,支持动态事件注册、处理器管理、权重排序和链式处理。 +本项目的事件系统是一个基于插件架构的异步事件处理框架,允许插件通过事件驱动的方式进行通信和协作。事件系统采用发布-订阅模式,支持动态事件注册、处理器管理、权重排序、链式处理和细粒度鉴权机制。 ## 核心概念 ### 事件 (Event) -事件是系统中发生的特定动作或状态变化,可以被多个处理器监听和响应。 +事件是系统中发生的特定动作或状态变化,可以被多个处理器监听和响应。每个事件可以配置订阅者和触发者的白名单权限。 ### 事件处理器 (Event Handler) -事件处理器是响应特定事件的代码单元,可以订阅一个或多个事件。 +事件处理器是响应特定事件的代码单元,可以订阅一个或多个事件。处理器支持权重排序和链式处理控制。 ### 事件管理器 (Event Manager) -事件管理器是事件系统的核心,负责事件的注册、处理器的管理以及事件的触发。 +事件管理器是事件系统的核心,负责事件的注册、处理器的管理、权限验证以及事件的触发。 + +### 鉴权机制 (Authentication Mechanism) +系统提供双重鉴权机制: +- **订阅者白名单** (`allowed_subscribers`): 控制哪些处理器可以订阅事件 +- **触发者白名单** (`allowed_triggers`): 控制哪些插件可以触发事件 ## 系统架构 ```mermaid graph TD A[插件系统] --> B[事件管理器] - B --> C[事件注册] - B --> D[处理器注册] - B --> E[事件触发] + B --> C[事件注册与鉴权] + B --> D[处理器注册与鉴权] + B --> E[事件触发与权限验证] C --> F[BaseEvent实例] D --> G[BaseEventHandler实例] E --> H[处理器执行] - H --> I[结果收集] + H --> I[结果收集与汇总] + C --> J[订阅者白名单验证] + E --> K[触发者白名单验证] ``` ## 内置事件类型 系统预定义了以下事件类型: -| 事件名称 | 描述 | 触发时机 | -|---------|------|----------| -| `on_start` | 启动事件 | 系统启动时 | -| `on_stop` | 停止事件 | 系统停止时 | -| `on_message` | 消息事件 | 收到新消息时 | -| `on_plan` | 计划事件 | 执行计划任务时 | -| `post_llm` | LLM后处理事件 | LLM处理完成后 | -| `after_llm` | LLM后事件 | LLM响应后 | -| `post_send` | 发送后处理事件 | 消息发送后 | -| `after_send` | 发送后事件 | 消息完全发送后 | +| 事件名称 | 描述 | 触发时机 | 默认权限 | +|---------|------|----------|----------| +| `on_start` | 启动事件 | 系统启动时 | SYSTEM | +| `on_stop` | 停止事件 | 系统停止时 | SYSTEM | +| `on_message` | 消息事件 | 收到新消息时 | SYSTEM | +| `on_plan` | 计划事件 | 执行计划任务时 | SYSTEM | +| `post_llm` | 准备LLM事件 | 准备LLM时 | SYSTEM | +| `after_llm` | LLM后事件 | LLM响应后 | SYSTEM | +| `post_send` | 准备发送消息事件 | 准备发送消息时 | SYSTEM | +| `after_send` | 发送后事件 | 消息完全发送后 | SYSTEM | ## 快速开始 @@ -72,7 +80,7 @@ class MyEventHandler(BaseEventHandler): return HandlerResult( success=True, - continue_process=True, # 是否继续让其他处理器处理 + continue_process=True, # 是否阻断后续流程 message="处理成功", handler_name=self.handler_name ) @@ -104,47 +112,178 @@ class MyPlugin(BasePlugin): ### 3. 触发事件 -使用事件管理器触发事件: +使用事件管理器触发事件,支持权限验证: ```python from src.plugin_system.core.event_manager import event_manager -# 触发内置事件 -await event_manager.trigger_event(EventType.ON_MESSAGE, message="Hello World") +# 触发内置事件(需要SYSTEM权限) +await event_manager.trigger_event(EventType.ON_MESSAGE, permission_group="SYSTEM", message="Hello World") -# 触发自定义事件 -await event_manager.trigger_event("custom_event", data={"key": "value"}) +# 触发自定义事件(需要相应权限) +await event_manager.trigger_event("custom_event", permission_group="my_plugin", data={"key": "value"}) +``` + +## 鉴权机制详解 + +### 事件注册时的权限控制 + +注册事件时可以指定订阅者和触发者的白名单: + +```python +# 注册事件,限制只有特定处理器可以订阅,特定插件可以触发 +event_manager.register_event( + "sensitive_event", + allowed_subscribers=["audit_handler", "log_handler"], # 订阅者白名单 + allowed_triggers=["security_plugin", "admin_plugin"] # 触发者白名单 +) +``` + +### 权限验证流程 + +1. **订阅权限验证**:处理器订阅事件时检查 `allowed_subscribers` +2. **触发权限验证**:插件触发事件时检查 `allowed_triggers` +3. **默认权限**:内置事件默认只允许 `SYSTEM` 权限组触发 + +### 权限组说明 + +- `SYSTEM`: 系统核心组件权限 +- `插件名称`: 各个插件的权限标识 +- 空字符串: 无权限组(无法触发有白名单的事件) + +## 使用模式 + +### 接口式模式(内部编写handler,外部触发) + +**适用场景**:插件提供事件处理能力,供其他组件调用 + +```python +# 服务提供者插件 +class DataProcessorHandler(BaseEventHandler): + handler_name = "data_processor" + handler_description = "数据处理服务" + weight = 10 # 权重,越大越先执行 + intercept_message = False # 是否拦截消息 + init_subscribe = ["process_data_request"] + + async def execute(self, params: dict) -> HandlerResult: + data = params.get("data") + processed = self.process_data(data) + return HandlerResult(True, True, processed) + +# 服务消费者插件 +async def use_data_service(): + result = await event_manager.trigger_event( + "process_data_request", + permission_group="consumer_plugin", + data={"input": "test"} + ) + if result: + processed_data = result.get_message_result() +``` + +### 通知式模式(外部编写handler,内部触发) + +**适用场景**:插件内部发生事件,通知外部处理器 + +```python +# 事件生产者插件 +class EventProducerPlugin(BasePlugin): + def __init__(self): + # 注册自定义事件,允许其他处理器订阅 + event_manager.register_event("custom_alert") + + async def detect_anomaly(self): + if anomaly_detected: + # 触发事件通知订阅者 + await event_manager.trigger_event( + "custom_alert", + permission_group=self.plugin_name, + anomaly_type="security", + severity="high" + ) + +# 事件消费者插件 +class AlertHandler(BaseEventHandler): + handler_name = "alert_handler" + init_subscribe = ["custom_alert"] + + async def execute(self, params: dict) -> HandlerResult: + anomaly_type = params.get("anomaly_type") + severity = params.get("severity") + self.handle_alert(anomaly_type, severity) + return HandlerResult(True, True, "Alert handled") ``` ## 高级用法 -### 动态事件管理 +### 动态订阅管理 -#### 注册自定义事件 -```python -# 注册新事件 -event_manager.register_event("my_custom_event") - -# 检查事件是否存在 -event = event_manager.get_event("my_custom_event") -``` - -#### 动态订阅管理 ```python # 动态订阅处理器到事件 -event_manager.subscribe_handler_to_event("handler_name", "event_name") +success = event_manager.subscribe_handler_to_event("handler_name", "event_name") # 取消订阅 -event_manager.unsubscribe_handler_from_event("handler_name", "event_name") +success = event_manager.unsubscribe_handler_from_event("handler_name", "event_name") + +# 处理器自管理订阅 +class DynamicHandler(BaseEventHandler): + async def setup_subscriptions(self): + self.subscribe("event1") + self.subscribe("event2") + + async def cleanup(self): + self.unsubscribe("event1") ``` -#### 启用/禁用事件 -```python -# 禁用事件 -event_manager.disable_event("event_name") +### 参数传递机制 -# 启用事件 -event_manager.enable_event("event_name") +事件支持灵活的参数传递: + +```python +# 触发事件时传递复杂参数 +await event_manager.trigger_event( + "complex_event", + permission_group="my_plugin", + user_info={"id": 123, "name": "test"}, + metadata={"timestamp": "2024-01-01", "source": "api"}, + nested_data={"level1": {"level2": "value"}} +) + +# 处理器接收参数 +async def execute(self, params: dict) -> HandlerResult: + user_info = params.get("user_info", {}) + metadata = params.get("metadata", {}) + # 处理参数... +``` + +### 结果汇总与处理 + +事件触发后返回 `HandlerResultsCollection`,提供丰富的查询方法: + +```python +results = await event_manager.trigger_event("my_event", permission_group="my_plugin", data=data) + +# 获取处理摘要 +summary = results.get_summary() +print(f"总处理器数: {summary['total_handlers']}") +print(f"成功数: {summary['success_count']}") +print(f"失败数: {summary['failure_count']}") +print(f"失败处理器: {summary['failed_handlers']}") + +# 获取特定处理器结果 +specific_result = results.get_handler_result("my_handler") +if specific_result and specific_result.success: + print(f"处理器结果: {specific_result.message}") + +# 检查处理链状态 +if results.all_continue_process(): + print("所有处理器都允许继续处理") +else: + print("有处理器中断了处理链") + +# 获取所有消息结果 +all_messages = results.get_message_result() ``` ### 事件处理器权重 @@ -152,11 +291,14 @@ event_manager.enable_event("event_name") 事件处理器支持权重机制,权重越高的处理器越先执行: ```python -class HighPriorityHandler(BaseEventHandler): - weight = 100 # 高优先级 +class CriticalHandler(BaseEventHandler): + weight = 100 # 高优先级,最先执行 -class LowPriorityHandler(BaseEventHandler): - weight = 1 # 低优先级 +class NormalHandler(BaseEventHandler): + weight = 50 # 中等优先级 + +class BackgroundHandler(BaseEventHandler): + weight = 1 # 低优先级,最后执行 ``` ### 事件链式处理 @@ -171,28 +313,9 @@ class FilterHandler(BaseEventHandler): return HandlerResult(True, True, "继续处理") ``` -### 事件结果处理 - -事件触发后返回 `HandlerResultsCollection`,可以获取详细的处理结果: - -```python -results = await event_manager.trigger_event("my_event", data=data) - -# 获取处理摘要 -summary = results.get_summary() -print(f"总处理器数: {summary['total_handlers']}") -print(f"成功数: {summary['success_count']}") -print(f"失败处理器: {summary['failed_handlers']}") - -# 获取特定处理器结果 -result = results.get_handler_result("my_handler") -if result and result.success: - print("处理器执行成功") -``` - ## 完整示例 -### 示例1:消息监控插件 +### 示例1:消息监控插件(带权限控制) ```python from src.plugin_system import BasePlugin, BaseEventHandler, register_plugin, EventType @@ -218,9 +341,10 @@ class MessageMonitorHandler(BaseEventHandler): # 关键词检测 if "重要" in str(message): self.keyword_hits += 1 - # 触发特殊事件 + # 触发特殊事件(需要相应权限) await event_manager.trigger_event( "important_message_detected", + permission_group=self.plugin_name, message=message, count=self.keyword_hits ) @@ -247,18 +371,21 @@ class MessageMonitorPlugin(BasePlugin): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - # 注册自定义事件 - event_manager.register_event("important_message_detected") + # 注册自定义事件,设置权限控制 + event_manager.register_event( + "important_message_detected", + allowed_subscribers=["important_handler", "audit_handler"], # 只允许特定处理器订阅 + allowed_triggers=["message_monitor"] # 只允许本插件触发 + ) def get_plugin_components(self): return [ (MessageMonitorHandler.get_handler_info(), MessageMonitorHandler), (ImportantMessageHandler.get_handler_info(), ImportantMessageHandler), ] - ``` -### 示例2:系统监控插件 +### 示例2:系统监控插件(带结果汇总) ```python import asyncio @@ -284,12 +411,19 @@ class SystemMonitorHandler(BaseEventHandler): memory = psutil.virtual_memory() if cpu_percent > 80 or memory.percent > 80: - await event_manager.trigger_event( + # 触发系统警报事件 + result = await event_manager.trigger_event( "system_alert", + permission_group=self.plugin_name, cpu_percent=cpu_percent, memory_percent=memory.percent, timestamp=datetime.now() ) + + # 处理结果汇总 + if result: + summary = result.get_summary() + print(f"警报处理结果: {summary['success_count']}成功, {summary['failure_count']}失败") await asyncio.sleep(30) @@ -303,66 +437,86 @@ class AlertHandler(BaseEventHandler): async def execute(self, params): cpu = params.get("cpu_percent") memory = params.get("memory_percent") - print(f"🚨 系统警报: CPU {cpu}%, 内存 {memory}%") + timestamp = params.get("timestamp") + print(f"🚨 系统警报({timestamp}): CPU {cpu}%, 内存 {memory}%") return HandlerResult(True, True, "警报已处理") +class AlertNotifierHandler(BaseEventHandler): + handler_name = "alert_notifier" + handler_description = "通知系统警报" + weight = 15 + intercept_message = False + init_subscribe = ["system_alert"] + + async def execute(self, params): + cpu = params.get("cpu_percent") + memory = params.get("memory_percent") + # 发送通知... + return HandlerResult(True, True, "通知已发送") + @register_plugin class SystemMonitorPlugin(BasePlugin): plugin_name = "system_monitor" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + # 注册系统警报事件 event_manager.register_event("system_alert") def get_plugin_components(self): return [ (SystemMonitorHandler.get_handler_info(), SystemMonitorHandler), (AlertHandler.get_handler_info(), AlertHandler), + (AlertNotifierHandler.get_handler_info(), AlertNotifierHandler), ] - ``` ## 调试和监控 ### 查看事件系统状态 + ```python # 获取事件系统摘要 summary = event_manager.get_event_summary() print(f"事件总数: {summary['total_events']}") +print(f"启用事件: {summary['enabled_events']}") +print(f"禁用事件: {summary['disabled_events']}") print(f"处理器总数: {summary['total_handlers']}") print(f"事件列表: {summary['event_names']}") +print(f"处理器列表: {summary['handler_names']}") ``` ### 查看事件订阅情况 + ```python # 查看特定事件的订阅者 subscribers = event_manager.get_event_subscribers(EventType.ON_MESSAGE) for name, handler in subscribers.items(): print(f"订阅者: {name}, 权重: {handler.weight}") + +# 查看事件的权限设置 +event = event_manager.get_event("important_message_detected") +if event: + print(f"允许的订阅者: {event.allowed_subscribers}") + print(f"允许的触发者: {event.allowed_triggers}") ``` -## 最佳实践 +### 事件执行监控 -1. **权重设置**:合理设置处理器权重,避免权重冲突 -2. **错误处理**:始终在处理器中添加异常处理 -3. **性能考虑**:避免在处理器中执行耗时操作,可使用异步任务 -4. **事件命名**:使用清晰的事件名称,避免与内置事件冲突 -5. **资源清理**:在插件卸载时取消订阅相关事件 -6. **日志记录**:适当记录处理日志,便于调试和监控 +```python +# 监控事件执行性能 +import time -## 常见问题 - -### Q: 如何处理事件的执行顺序? -A: 通过设置处理器的 `weight` 属性来控制执行顺序,权重越高越先执行。 - -### Q: 如何阻止后续处理器执行? -A: 在处理器返回 `HandlerResult` 时设置 `continue_process=False`。 - -### Q: 如何动态注册事件? -A: 使用 `event_manager.register_event("event_name")` 方法注册新事件。 - -### Q: 如何获取事件处理结果? -A: 事件触发后会返回 `HandlerResultsCollection`,可以获取详细的处理结果和摘要信息。 - -### Q: 如何处理异步事件? -A: 所有事件处理器都是异步的,可以在 `execute` 方法中使用 `await` 进行异步操作。 \ No newline at end of file +async def monitored_trigger(event_name, **kwargs): + start_time = time.time() + results = await event_manager.trigger_event(event_name, **kwargs) + end_time = time.time() + + if results: + execution_time = end_time - start_time + summary = results.get_summary() + print(f"事件 {event_name} 执行时间: {execution_time:.3f}s") + print(f"处理器执行统计: {summary}") + + return results +``` \ No newline at end of file