Files
Mofox-Core/docs/message_runtime_architecture.md

8.9 KiB
Raw Permalink Blame History

MoFox Bot 消息运行时架构 (MessageRuntime)

本文档描述了 MoFox Bot 使用 mofox_wire.MessageRuntime 简化消息处理链条的架构设计。

架构概述

┌─────────────────────────────────────────────────────────────────────────┐
│                           CoreSinkManager                                │
│  ┌─────────────────────────────────────────────────────────────────────┐│
│  │                        MessageRuntime                                ││
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐               ││
│  │  │ before_hook  │→ │   Routes     │→ │ after_hook   │               ││
│  │  │ (预处理/过滤) │  │ (消息路由)   │  │ (后处理)     │               ││
│  │  └──────────────┘  └──────────────┘  └──────────────┘               ││
│  │         ↓                 ↓                 ↓                        ││
│  │  ┌──────────────────────────────────────────────────────────────┐   ││
│  │  │                     error_hook (错误处理)                      │   ││
│  │  └──────────────────────────────────────────────────────────────┘   ││
│  └─────────────────────────────────────────────────────────────────────┘│
│                                                                          │
│  ┌──────────────────────┐   ┌──────────────────────────────────────┐    │
│  │ InProcessCoreSink    │   │ ProcessCoreSinkServer (子进程适配器)  │    │
│  │ (同进程适配器)        │   │                                      │    │
│  └──────────────────────┘   └──────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────────┘
              ↑                                    ↑
              │                                    │
    ┌─────────────────────┐            ┌─────────────────────┐
    │    同进程适配器      │            │     子进程适配器      │
    │ (run_in_subprocess  │            │ (run_in_subprocess   │
    │  = False)           │            │  = True)             │
    └─────────────────────┘            └─────────────────────┘

核心组件

1. CoreSinkManager (src/common/core_sink_manager.py)

统一管理 CoreSink 双实例和 MessageRuntime

from src.common.core_sink_manager import get_core_sink_manager, get_message_runtime

# 获取管理器
manager = get_core_sink_manager()

# 获取 MessageRuntime
runtime = get_message_runtime()

# 发送消息到适配器
await manager.send_outgoing(envelope)

2. MessageRuntime

MessageRuntime 是 mofox_wire 提供的消息路由核心,支持:

  • 消息路由:通过 add_route()@on_message 装饰器按消息类型路由
  • 钩子机制before_hook(前置处理)、after_hook(后置处理)、error_hook(错误处理)
  • 中间件:洋葱模型的中间件机制
  • 批量处理:支持 handle_batch() 批量处理消息

3. MessageHandler (src/chat/message_receive/message_handler.py)

将消息处理逻辑注册为 MessageRuntime 的路由和钩子:

class MessageHandler:
    def register_handlers(self, runtime: MessageRuntime) -> None:
        # 注册前置钩子
        runtime.register_before_hook(self._before_hook)
        
        # 注册后置钩子
        runtime.register_after_hook(self._after_hook)
        
        # 注册错误钩子
        runtime.register_error_hook(self._error_hook)
        
        # 注册适配器响应处理器
        runtime.add_route(
            predicate=_is_adapter_response,
            handler=self._handle_adapter_response_route,
            name="adapter_response_handler",
            message_type="adapter_response",
        )
        
        # 注册默认消息处理器
        runtime.add_route(
            predicate=lambda _: True,
            handler=self._handle_normal_message,
            name="default_message_handler",
        )

消息流向

接收消息

适配器 → InProcessCoreSink/ProcessCoreSinkServer → CoreSinkManager._dispatch_to_runtime()
       → MessageRuntime.handle_message()
       → before_hook (预处理、过滤)
       → 匹配路由 (adapter_response / normal_message)
       → 执行处理器
       → after_hook (后处理)

发送消息

消息发送请求 → CoreSinkManager.send_outgoing()
           → InProcessCoreSink.push_outgoing()
           → ProcessCoreSinkServer.push_outgoing()
           → 适配器

钩子功能

before_hook

在消息路由之前执行,用于:

  • 标准化 ID 为字符串
  • 检查 echo 消息(自身消息上报)
  • 通过抛出 UserWarning 跳过消息处理

after_hook

在消息处理完成后执行,用于:

  • 清理工作
  • 日志记录

error_hook

在处理过程中出现异常时执行,用于:

  • 区分预期的流程控制UserWarning和真正的错误
  • 统一异常日志记录

路由优先级

  1. 明确指定 message_type 的路由(优先级最高)
  2. 事件路由(基于 event_type
  3. 通用路由(无 message_type 限制)

扩展消息处理

注册自定义处理器

from src.common.core_sink_manager import get_message_runtime
from mofox_wire import MessageEnvelope

runtime = get_message_runtime()

# 使用装饰器
@runtime.on_message(message_type="image")
async def handle_image(envelope: MessageEnvelope):
    # 处理图片消息
    pass

# 或使用 add_route
runtime.add_route(
    predicate=lambda env: env.get("platform") == "qq",
    handler=my_handler,
    name="qq_handler",
)

注册钩子

runtime = get_message_runtime()

# 前置钩子
async def my_before_hook(envelope: MessageEnvelope) -> None:
    # 预处理逻辑
    pass

runtime.register_before_hook(my_before_hook)

# 错误钩子
async def my_error_hook(envelope: MessageEnvelope, exc: BaseException) -> None:
    # 错误处理逻辑
    pass

runtime.register_error_hook(my_error_hook)

初始化流程

MainSystem.initialize() 中:

  1. 初始化 CoreSinkManager(包含 MessageRuntime
  2. 获取 MessageHandler 并设置 CoreSinkManager 引用
  3. 调用 MessageHandler.register_handlers()MessageRuntime 注册处理器和钩子
  4. 初始化其他组件
async def initialize(self) -> None:
    # 初始化 CoreSinkManager包含 MessageRuntime
    self.core_sink_manager = await initialize_core_sink_manager()
    
    # 获取 MessageHandler 并向 MessageRuntime 注册处理器
    self.message_handler = get_message_handler()
    self.message_handler.set_core_sink_manager(self.core_sink_manager)
    self.message_handler.register_handlers(self.core_sink_manager.runtime)

优势

  1. 简化消息处理链:不再需要手动管理处理流程,使用声明式路由
  2. 更好的可扩展性:通过 add_route() 或装饰器轻松添加新的处理器
  3. 统一的错误处理:通过 error_hook 集中处理异常
  4. 支持中间件:可以添加洋葱模型的中间件
  5. 更清晰的代码结构:处理逻辑按类型分离

参考

  • packages/mofox-wire/src/mofox_wire/runtime.py - MessageRuntime 实现
  • src/common/core_sink_manager.py - CoreSinkManager 实现
  • src/chat/message_receive/message_handler.py - MessageHandler 实现
  • docs/mofox_wire.md - MoFox Bus 消息库说明