8.9 KiB
8.9 KiB
MoFox Bot 消息运行时架构 (MessageRuntime)
本文档描述了 MoFox Bot 使用 mofox_wire.MessageRuntime 简化消息处理链条的架构设计。
架构概述
┌─────────────────────────────────────────────────────────────────────────┐
│ CoreSinkManager │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ MessageRuntime ││
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││
│ │ │ before_hook │→ │ Routes │→ │ after_hook │ ││
│ │ │ (预处理/过滤) │ │ (消息路由) │ │ (后处理) │ ││
│ │ └──────────────┘ └──────────────┘ └──────────────┘ ││
│ │ ↓ ↓ ↓ ││
│ │ ┌──────────────────────────────────────────────────────────────┐ ││
│ │ │ error_hook (错误处理) │ ││
│ │ └──────────────────────────────────────────────────────────────┘ ││
│ └─────────────────────────────────────────────────────────────────────┘│
│ │
│ ┌──────────────────────┐ ┌──────────────────────────────────────┐ │
│ │ InProcessCoreSink │ │ ProcessCoreSinkServer (子进程适配器) │ │
│ │ (同进程适配器) │ │ │ │
│ └──────────────────────┘ └──────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
↑ ↑
│ │
┌─────────────────────┐ ┌─────────────────────┐
│ 同进程适配器 │ │ 子进程适配器 │
│ (run_in_subprocess │ │ (run_in_subprocess │
│ = False) │ │ = True) │
└─────────────────────┘ └─────────────────────┘
核心组件
1. CoreSinkManager (src/common/core_sink_manager.py)
统一管理 CoreSink 双实例和 MessageRuntime:
from src.common.core_sink_manager import get_core_sink_manager, get_message_runtime
# 获取管理器
manager = get_core_sink_manager()
# 获取 MessageRuntime
runtime = get_message_runtime()
# 发送消息到适配器
await manager.send_outgoing(envelope)
2. MessageRuntime
MessageRuntime 是 mofox_wire 提供的消息路由核心,支持:
- 消息路由:通过
add_route()或@on_message装饰器按消息类型路由 - 钩子机制:
before_hook(前置处理)、after_hook(后置处理)、error_hook(错误处理) - 中间件:洋葱模型的中间件机制
- 批量处理:支持
handle_batch()批量处理消息
3. MessageHandler (src/chat/message_receive/message_handler.py)
将消息处理逻辑注册为 MessageRuntime 的路由和钩子:
class MessageHandler:
def register_handlers(self, runtime: MessageRuntime) -> None:
# 注册前置钩子
runtime.register_before_hook(self._before_hook)
# 注册后置钩子
runtime.register_after_hook(self._after_hook)
# 注册错误钩子
runtime.register_error_hook(self._error_hook)
# 注册适配器响应处理器
runtime.add_route(
predicate=_is_adapter_response,
handler=self._handle_adapter_response_route,
name="adapter_response_handler",
message_type="adapter_response",
)
# 注册默认消息处理器
runtime.add_route(
predicate=lambda _: True,
handler=self._handle_normal_message,
name="default_message_handler",
)
消息流向
接收消息
适配器 → InProcessCoreSink/ProcessCoreSinkServer → CoreSinkManager._dispatch_to_runtime()
→ MessageRuntime.handle_message()
→ before_hook (预处理、过滤)
→ 匹配路由 (adapter_response / normal_message)
→ 执行处理器
→ after_hook (后处理)
发送消息
消息发送请求 → CoreSinkManager.send_outgoing()
→ InProcessCoreSink.push_outgoing()
→ ProcessCoreSinkServer.push_outgoing()
→ 适配器
钩子功能
before_hook
在消息路由之前执行,用于:
- 标准化 ID 为字符串
- 检查 echo 消息(自身消息上报)
- 通过抛出
UserWarning跳过消息处理
after_hook
在消息处理完成后执行,用于:
- 清理工作
- 日志记录
error_hook
在处理过程中出现异常时执行,用于:
- 区分预期的流程控制(UserWarning)和真正的错误
- 统一异常日志记录
路由优先级
- 明确指定 message_type 的路由(优先级最高)
- 事件路由(基于 event_type)
- 通用路由(无 message_type 限制)
扩展消息处理
注册自定义处理器
from src.common.core_sink_manager import get_message_runtime
from mofox_wire import MessageEnvelope
runtime = get_message_runtime()
# 使用装饰器
@runtime.on_message(message_type="image")
async def handle_image(envelope: MessageEnvelope):
# 处理图片消息
pass
# 或使用 add_route
runtime.add_route(
predicate=lambda env: env.get("platform") == "qq",
handler=my_handler,
name="qq_handler",
)
注册钩子
runtime = get_message_runtime()
# 前置钩子
async def my_before_hook(envelope: MessageEnvelope) -> None:
# 预处理逻辑
pass
runtime.register_before_hook(my_before_hook)
# 错误钩子
async def my_error_hook(envelope: MessageEnvelope, exc: BaseException) -> None:
# 错误处理逻辑
pass
runtime.register_error_hook(my_error_hook)
初始化流程
在 MainSystem.initialize() 中:
- 初始化
CoreSinkManager(包含MessageRuntime) - 获取
MessageHandler并设置CoreSinkManager引用 - 调用
MessageHandler.register_handlers()向MessageRuntime注册处理器和钩子 - 初始化其他组件
async def initialize(self) -> None:
# 初始化 CoreSinkManager(包含 MessageRuntime)
self.core_sink_manager = await initialize_core_sink_manager()
# 获取 MessageHandler 并向 MessageRuntime 注册处理器
self.message_handler = get_message_handler()
self.message_handler.set_core_sink_manager(self.core_sink_manager)
self.message_handler.register_handlers(self.core_sink_manager.runtime)
优势
- 简化消息处理链:不再需要手动管理处理流程,使用声明式路由
- 更好的可扩展性:通过
add_route()或装饰器轻松添加新的处理器 - 统一的错误处理:通过
error_hook集中处理异常 - 支持中间件:可以添加洋葱模型的中间件
- 更清晰的代码结构:处理逻辑按类型分离
参考
packages/mofox-wire/src/mofox_wire/runtime.py- MessageRuntime 实现src/common/core_sink_manager.py- CoreSinkManager 实现src/chat/message_receive/message_handler.py- MessageHandler 实现docs/mofox_wire.md- MoFox Bus 消息库说明