diff --git a/examples/mofox_bus_demo_adapter.py b/examples/mofox_bus_demo_adapter.py index 4a969d712..8c3d2090b 100644 --- a/examples/mofox_bus_demo_adapter.py +++ b/examples/mofox_bus_demo_adapter.py @@ -10,13 +10,10 @@ from __future__ import annotations import asyncio import time import uuid -from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any import orjson import websockets - - from mofox_wire import ( AdapterBase, InProcessCoreSink, @@ -25,7 +22,6 @@ from mofox_wire import ( WebSocketAdapterOptions, ) - # --------------------------------------------------------------------------- # 1. 模拟一个提供 WebSocket 接口的平台 # --------------------------------------------------------------------------- @@ -93,7 +89,7 @@ class DemoWsAdapter(AdapterBase): # 继承AdapterBase # 实现 from_platform_message 方法,将平台消息转换为 MessageEnvelope # 该方法必须被实现以便 AdapterBase 正确处理消息转换 # 该方法会在adapter接收到平台消息后被调用 - def from_platform_message(self, raw: Dict[str, Any]) -> MessageEnvelope: + def from_platform_message(self, raw: dict[str, Any]) -> MessageEnvelope: return { "id": raw["message_id"], "direction": "incoming", @@ -151,7 +147,7 @@ async def handle_incoming(env: MessageEnvelope) -> MessageEnvelope: } -adapter: Optional[DemoWsAdapter] = None +adapter: DemoWsAdapter | None = None async def core_entry(message: MessageEnvelope) -> None: diff --git a/scripts/generate_missing_embeddings.py b/scripts/generate_missing_embeddings.py index ba50861dc..951db9cde 100644 --- a/scripts/generate_missing_embeddings.py +++ b/scripts/generate_missing_embeddings.py @@ -10,7 +10,7 @@ 使用方法: python scripts/generate_missing_embeddings.py [--node-types TOPIC,OBJECT] [--batch-size 50] - + 参数说明: --node-types: 需要生成嵌入的节点类型,默认为 TOPIC,OBJECT --batch-size: 批量处理大小,默认为 50 @@ -25,7 +25,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) async def generate_missing_embeddings( - target_node_types: list[str] = None, + target_node_types: list[str] | None = None, batch_size: int = 50, ): """ diff --git a/scripts/lpmm_learning_tool.py b/scripts/lpmm_learning_tool.py index df9325616..aed8b5b21 100644 --- a/scripts/lpmm_learning_tool.py +++ b/scripts/lpmm_learning_tool.py @@ -174,12 +174,12 @@ def get_extraction_prompt(paragraph: str) -> str: async def extract_info_async(pg_hash, paragraph, llm_api): """ 异步提取单个段落的信息(带缓存支持) - + Args: pg_hash: 段落哈希值 paragraph: 段落文本 llm_api: LLM请求实例 - + Returns: tuple: (doc_item或None, failed_hash或None) """ @@ -231,15 +231,15 @@ async def extract_info_async(pg_hash, paragraph, llm_api): async def extract_information(paragraphs_dict, model_set): """ 🔧 优化:使用真正的异步并发代替多线程 - + 这样可以: 1. 避免 event loop closed 错误 2. 更高效地利用 I/O 资源 3. 与我们优化的 LLM 请求层无缝集成 - + 并发控制: - 使用信号量限制最大并发数为 5,防止触发 API 速率限制 - + Args: paragraphs_dict: {hash: paragraph} 字典 model_set: 模型配置 @@ -307,8 +307,8 @@ async def extract_information(paragraphs_dict, model_set): now = datetime.datetime.now() filename = now.strftime("%Y-%m-%d-%H-%M-%S-openie.json") output_path = os.path.join(OPENIE_OUTPUT_DIR, filename) - with open(output_path, "wb") as f: - f.write(orjson.dumps(openie_obj._to_dict())) + async with aiofiles.open(output_path, "wb") as f: + await f.write(orjson.dumps(openie_obj._to_dict())) logger.info(f"信息提取结果已保存到: {output_path}") logger.info(f"成功提取 {len(open_ie_docs)} 个段落的信息") diff --git a/src/chat/message_receive/message_handler.py b/src/chat/message_receive/message_handler.py index 82751d800..423139e56 100644 --- a/src/chat/message_receive/message_handler.py +++ b/src/chat/message_receive/message_handler.py @@ -27,21 +27,17 @@ from __future__ import annotations -import asyncio import os import re -import time import traceback -from functools import partial from typing import TYPE_CHECKING, Any from mofox_wire import MessageEnvelope, MessageRuntime from src.chat.message_manager import message_manager from src.chat.message_receive.storage import MessageStorage -from src.chat.utils.prompt import global_prompt_manager from src.chat.utils.utils import is_mentioned_bot_in_message -from src.common.data_models.database_data_model import DatabaseMessages, DatabaseUserInfo, DatabaseGroupInfo +from src.common.data_models.database_data_model import DatabaseGroupInfo, DatabaseMessages, DatabaseUserInfo from src.common.logger import get_logger from src.config.config import global_config from src.mood.mood_manager import mood_manager @@ -49,8 +45,8 @@ from src.plugin_system.base import BaseCommand, EventType from src.plugin_system.core import component_registry, event_manager, global_announcement_manager if TYPE_CHECKING: - from src.common.core_sink_manager import CoreSinkManager from src.chat.message_receive.chat_stream import ChatStream + from src.common.core_sink_manager import CoreSinkManager logger = get_logger("message_handler") @@ -82,16 +78,16 @@ def _check_ban_regex(text: str, chat: "ChatStream", userinfo) -> bool: class MessageHandler: """ 统一消息处理器 - + 利用 MessageRuntime 的路由功能,将消息处理逻辑注册为路由和钩子。 - + 架构说明: - 在 register_handlers() 中向 MessageRuntime 注册各种处理器 - 使用 @runtime.on_message(message_type=...) 按消息类型路由 - 使用 before_hook 进行消息预处理 - 使用 after_hook 进行消息后处理 - 使用 error_hook 统一处理异常 - + 主要功能: 1. 消息预处理:ID标准化、过滤检查 2. 适配器响应处理:处理 adapter_response 类型消息 @@ -113,44 +109,44 @@ class MessageHandler: def register_handlers(self, runtime: MessageRuntime) -> None: """ 向 MessageRuntime 注册消息处理器和钩子 - + 这是核心方法,在系统初始化时调用,将所有处理逻辑注册到 runtime。 - + Args: runtime: MessageRuntime 实例 """ self._runtime = runtime - + # 注册前置钩子:消息预处理和过滤 runtime.register_before_hook(self._before_hook) - + # 注册后置钩子:存储、情绪更新等 runtime.register_after_hook(self._after_hook) - + # 注册错误钩子:统一异常处理 runtime.register_error_hook(self._error_hook) - + # 注册适配器响应处理器(最高优先级) def _is_adapter_response(env: MessageEnvelope) -> bool: segment = env.get("message_segment") if isinstance(segment, dict): return segment.get("type") == "adapter_response" return False - + runtime.add_route( predicate=_is_adapter_response, handler=self._handle_adapter_response_route, name="adapter_response_handler", message_type="adapter_response", ) - + # 注册默认消息处理器(处理所有其他消息) runtime.add_route( predicate=lambda _: True, # 匹配所有消息 handler=self._handle_normal_message, name="default_message_handler", ) - + logger.info("MessageHandler 已向 MessageRuntime 注册处理器和钩子") async def ensure_started(self) -> None: @@ -169,7 +165,7 @@ class MessageHandler: async def _before_hook(self, envelope: MessageEnvelope) -> None: """ 前置钩子:消息预处理 - + 1. 标准化 ID 为字符串 2. 检查是否为 echo 消息(自身发送的消息上报) 3. 附加预处理数据到 envelope(chat_stream, message 等) @@ -211,7 +207,7 @@ class MessageHandler: async def _after_hook(self, envelope: MessageEnvelope) -> None: """ 后置钩子:消息后处理 - + 在消息处理完成后执行的清理工作 """ # 后置处理逻辑(如有需要) @@ -242,7 +238,7 @@ class MessageHandler: async def _handle_normal_message(self, envelope: MessageEnvelope) -> MessageEnvelope | None: """ 默认消息处理器:处理普通消息 - + 1. 获取或创建聊天流 2. 转换为 DatabaseMessages 3. 过滤检查 @@ -261,10 +257,10 @@ class MessageHandler: if not user_info and not group_info: logger.debug("消息缺少用户信息,已跳过处理") return None - + # 获取或创建聊天流 platform = message_info.get("platform", "unknown") - + from src.chat.message_receive.chat_stream import get_chat_manager chat = await get_chat_manager().get_or_create_stream( platform=platform, @@ -325,10 +321,10 @@ class MessageHandler: async def process_message(self, envelope: MessageEnvelope) -> None: """ 处理接收到的消息信封(向后兼容) - + 注意:此方法已被 MessageRuntime 路由取代。 如果直接调用此方法,它会委托给 runtime.handle_message()。 - + Args: envelope: 消息信封(来自适配器) """ @@ -360,8 +356,8 @@ class MessageHandler: # 触发消息事件 result = await event_manager.trigger_event( - EventType.ON_MESSAGE, - permission_group="SYSTEM", + EventType.ON_MESSAGE, + permission_group="SYSTEM", message=message ) if result and not result.all_continue_process(): @@ -379,8 +375,8 @@ class MessageHandler: logger.error(traceback.format_exc()) async def _process_plus_commands( - self, - message: DatabaseMessages, + self, + message: DatabaseMessages, chat: "ChatStream" ) -> tuple[bool, Any, bool]: """处理 PlusCommand 系统""" @@ -490,8 +486,8 @@ class MessageHandler: return False, None, True async def _process_base_commands( - self, - message: DatabaseMessages, + self, + message: DatabaseMessages, chat: "ChatStream" ) -> tuple[bool, Any, bool]: """处理传统 BaseCommand 系统""" diff --git a/src/common/core_sink_manager.py b/src/common/core_sink_manager.py index 68ea7d1a6..4bd0a201a 100644 --- a/src/common/core_sink_manager.py +++ b/src/common/core_sink_manager.py @@ -20,9 +20,9 @@ CoreSink 统一管理器 from __future__ import annotations import asyncio -import contextlib import multiprocessing as mp -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional +from collections.abc import Awaitable, Callable +from typing import Any from mofox_wire import ( InProcessCoreSink, @@ -33,9 +33,6 @@ from mofox_wire import ( from src.common.logger import get_logger -if TYPE_CHECKING: - from chat.message_receive.message_handler import MessageHandler - logger = get_logger("core_sink_manager") @@ -46,20 +43,20 @@ MessageHandlerCallback = Callable[[MessageEnvelope], Awaitable[None]] class CoreSinkManager: """ CoreSink 统一管理器 - + 管理 InProcessCoreSink 和 ProcessCoreSinkServer 双实例, 集成 MessageRuntime 提供统一的消息路由和收发接口。 - + 架构说明: - InProcessCoreSink: 用于同进程内的适配器(run_in_subprocess=False) - ProcessCoreSinkServer: 用于管理与子进程适配器的通信 - MessageRuntime: 统一消息路由,支持 @on_message 装饰器和钩子机制 - + 消息流向: 1. 适配器(同进程)→ InProcessCoreSink → MessageRuntime.handle_message() → 注册的处理器 2. 适配器(子进程)→ ProcessCoreSinkServer → MessageRuntime.handle_message() → 注册的处理器 3. 核心回复 → CoreSinkManager.send_outgoing → 适配器 - + 使用 MessageRuntime 的优势: - 支持 @runtime.on_message(message_type="xxx") 按消息类型路由 - 支持 before_hook/after_hook/error_hook 统一处理流程 @@ -70,40 +67,43 @@ class CoreSinkManager: def __init__(self): # MessageRuntime 实例 self._runtime: MessageRuntime = MessageRuntime() - + # InProcessCoreSink 实例(用于同进程适配器) self._in_process_sink: InProcessCoreSink | None = None - + # 子进程通信管理 # key: adapter_name, value: (ProcessCoreSinkServer, incoming_queue, outgoing_queue) - self._process_sinks: Dict[str, tuple[ProcessCoreSinkServer, mp.Queue, mp.Queue]] = {} - + self._process_sinks: dict[str, tuple[ProcessCoreSinkServer, mp.Queue, mp.Queue]] = {} + # multiprocessing context self._mp_ctx = mp.get_context("spawn") - + # 运行状态 self._running = False self._initialized = False + # 后台任务集合(防止任务被垃圾回收) + self._background_tasks: set[asyncio.Task] = set() + @property def runtime(self) -> MessageRuntime: """ 获取 MessageRuntime 实例 - + 外部模块可以通过此属性注册消息处理器、钩子等: - + ```python manager = get_core_sink_manager() - + # 注册消息处理器 @manager.runtime.on_message(message_type="text") async def handle_text(envelope: MessageEnvelope): ... - + # 注册前置钩子 manager.runtime.register_before_hook(my_before_hook) ``` - + Returns: MessageRuntime 实例 """ @@ -112,31 +112,31 @@ class CoreSinkManager: async def initialize(self) -> None: """ 初始化 CoreSink 管理器 - + 创建 InProcessCoreSink,将收到的消息交给 MessageRuntime 处理。 """ if self._initialized: logger.warning("CoreSinkManager 已经初始化,跳过重复初始化") return - + logger.info("正在初始化 CoreSink 管理器...") - + # 创建 InProcessCoreSink,使用 MessageRuntime 作为消息处理入口 self._in_process_sink = InProcessCoreSink(self._dispatch_to_runtime) - + self._running = True self._initialized = True - + logger.info("CoreSink 管理器初始化完成(已集成 MessageRuntime)") async def shutdown(self) -> None: """关闭 CoreSink 管理器""" if not self._running: return - + logger.info("正在关闭 CoreSink 管理器...") self._running = False - + # 关闭所有 ProcessCoreSinkServer for adapter_name, (server, _, _) in list(self._process_sinks.items()): try: @@ -144,26 +144,26 @@ class CoreSinkManager: logger.info(f"已关闭适配器 {adapter_name} 的 ProcessCoreSinkServer") except Exception as e: logger.error(f"关闭适配器 {adapter_name} 的 ProcessCoreSinkServer 时出错: {e}") - + self._process_sinks.clear() - + # 关闭 InProcessCoreSink if self._in_process_sink: await self._in_process_sink.close() self._in_process_sink = None - + self._initialized = False logger.info("CoreSink 管理器已关闭") def get_in_process_sink(self) -> InProcessCoreSink: """ 获取 InProcessCoreSink 实例 - + 用于同进程运行的适配器 - + Returns: InProcessCoreSink 实例 - + Raises: RuntimeError: 如果管理器未初始化 """ @@ -174,34 +174,36 @@ class CoreSinkManager: def create_process_sink_queues(self, adapter_name: str) -> tuple[mp.Queue, mp.Queue]: """ 为子进程适配器创建通信队列 - + 创建 incoming 和 outgoing 队列对,用于与子进程适配器通信。 同时创建 ProcessCoreSinkServer 来处理消息转发。 - + Args: adapter_name: 适配器名称 - + Returns: (to_core_queue, from_core_queue) 元组 - to_core_queue: 子进程发送到核心的队列 - from_core_queue: 核心发送到子进程的队列 - + Raises: RuntimeError: 如果管理器未初始化 """ if not self._initialized: raise RuntimeError("CoreSinkManager 未初始化,请先调用 initialize()") - + if adapter_name in self._process_sinks: logger.warning(f"适配器 {adapter_name} 的队列已存在,将被覆盖") # 先关闭旧的 old_server, _, _ = self._process_sinks[adapter_name] - asyncio.create_task(old_server.close()) - + task = asyncio.create_task(old_server.close()) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + # 创建通信队列 incoming_queue = self._mp_ctx.Queue() # 子进程 → 核心 outgoing_queue = self._mp_ctx.Queue() # 核心 → 子进程 - + # 创建 ProcessCoreSinkServer,使用 MessageRuntime 处理消息 server = ProcessCoreSinkServer( incoming_queue=incoming_queue, @@ -209,48 +211,50 @@ class CoreSinkManager: core_handler=self._dispatch_to_runtime, name=adapter_name, ) - + # 启动服务器 server.start() - + # 存储引用 self._process_sinks[adapter_name] = (server, incoming_queue, outgoing_queue) - + logger.info(f"为适配器 {adapter_name} 创建了 ProcessCoreSink 通信队列") - + return incoming_queue, outgoing_queue def remove_process_sink(self, adapter_name: str) -> None: """ 移除子进程适配器的通信队列 - + Args: adapter_name: 适配器名称 """ if adapter_name not in self._process_sinks: logger.warning(f"适配器 {adapter_name} 的队列不存在") return - + server, _, _ = self._process_sinks.pop(adapter_name) - asyncio.create_task(server.close()) + task = asyncio.create_task(server.close()) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) logger.info(f"已移除适配器 {adapter_name} 的 ProcessCoreSink 通信队列") async def send_outgoing( - self, - envelope: MessageEnvelope, + self, + envelope: MessageEnvelope, platform: str | None = None, adapter_name: str | None = None ) -> None: """ 发送消息到适配器 - + 根据 platform 或 adapter_name 路由到正确的适配器。 - + Args: envelope: 消息信封 platform: 目标平台(可选) adapter_name: 目标适配器名称(可选) - + 路由规则: 1. 如果指定了 adapter_name,直接发送到该适配器 2. 如果指定了 platform,发送到所有匹配平台的适配器 @@ -259,11 +263,11 @@ class CoreSinkManager: # 从 envelope 中获取 platform if platform is None: platform = envelope.get("platform") or envelope.get("message_info", {}).get("platform") - + # 发送到 InProcessCoreSink(会自动广播到所有注册的 outgoing handler) if self._in_process_sink: await self._in_process_sink.push_outgoing(envelope) - + # 发送到所有 ProcessCoreSinkServer for name, (server, _, _) in self._process_sinks.items(): if adapter_name and name != adapter_name: @@ -276,17 +280,17 @@ class CoreSinkManager: async def _dispatch_to_runtime(self, envelope: MessageEnvelope) -> None: """ 将消息分发给 MessageRuntime 处理 - + 这是内部方法,由 InProcessCoreSink 和 ProcessCoreSinkServer 调用。 所有从适配器接收到的消息都会经过这里,然后交给 MessageRuntime 路由。 - + Args: envelope: 消息信封 """ if not self._running: logger.warning("CoreSinkManager 未运行,忽略接收到的消息") return - + try: # 使用 MessageRuntime 处理消息 await self._runtime.handle_message(envelope) @@ -309,19 +313,19 @@ def get_core_sink_manager() -> CoreSinkManager: def get_message_runtime() -> MessageRuntime: """ 获取全局 MessageRuntime 实例 - + 这是获取 MessageRuntime 的推荐方式,用于注册消息处理器、钩子等: - + ```python from src.common.core_sink_manager import get_message_runtime - + runtime = get_message_runtime() - + @runtime.on_message(message_type="text") async def handle_text(envelope: MessageEnvelope): ... ``` - + Returns: MessageRuntime 实例 """ @@ -331,7 +335,7 @@ def get_message_runtime() -> MessageRuntime: async def initialize_core_sink_manager() -> CoreSinkManager: """ 初始化 CoreSinkManager 单例 - + Returns: 初始化后的 CoreSinkManager 实例 """ @@ -355,9 +359,9 @@ async def shutdown_core_sink_manager() -> None: def get_core_sink() -> InProcessCoreSink: """ 获取 InProcessCoreSink 实例(向后兼容) - + 这是旧版 API,推荐使用 get_core_sink_manager().get_in_process_sink() - + Returns: InProcessCoreSink 实例 """ @@ -367,7 +371,7 @@ def get_core_sink() -> InProcessCoreSink: def set_core_sink(sink: Any) -> None: """ 设置 CoreSink(向后兼容,现已弃用) - + 新架构中 CoreSink 由 CoreSinkManager 统一管理,不再支持外部设置。 此函数保留仅为兼容旧代码,调用会记录警告日志。 """ @@ -380,7 +384,7 @@ def set_core_sink(sink: Any) -> None: async def push_outgoing(envelope: MessageEnvelope) -> None: """ 将消息推送到所有适配器(向后兼容) - + Args: envelope: 消息信封 """ @@ -390,12 +394,12 @@ async def push_outgoing(envelope: MessageEnvelope) -> None: __all__ = [ "CoreSinkManager", + # 向后兼容 + "get_core_sink", "get_core_sink_manager", "get_message_runtime", "initialize_core_sink_manager", - "shutdown_core_sink_manager", - # 向后兼容 - "get_core_sink", - "set_core_sink", "push_outgoing", + "set_core_sink", + "shutdown_core_sink_manager", ]