diff --git a/src/main.py b/src/main.py index f58fc0d9d..621014ae6 100644 --- a/src/main.py +++ b/src/main.py @@ -14,7 +14,7 @@ from .plugins.chat.storage import MessageStorage from .plugins.config.config import global_config from .plugins.chat.bot import chat_bot from .common.logger import get_module_logger -from .plugins.remote import heartbeat_thread # noqa: F401 +from .plugins.remote import heartbeat_thread # noqa: F401 logger = get_module_logger("main") @@ -108,7 +108,6 @@ class MainSystem: self.remove_recalled_message_task(), emoji_manager.start_periodic_check(), self.app.run(), - self.app.message_process(), ] await asyncio.gather(*tasks) diff --git a/src/plugins/message/api.py b/src/plugins/message/api.py index 0478aab16..30cc8aeca 100644 --- a/src/plugins/message/api.py +++ b/src/plugins/message/api.py @@ -26,11 +26,20 @@ class BaseMessageAPI: @self.app.post("/api/message") async def handle_message(message: Dict[str, Any]): try: - self.cache.append(message) + # 创建后台任务处理消息 + asyncio.create_task(self._background_message_handler(message)) return {"status": "success"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e + async def _background_message_handler(self, message: Dict[str, Any]): + """后台处理单个消息""" + try: + await self.process_single_message(message) + except Exception as e: + logger.error(f"Background message processing failed: {str(e)}") + logger.error(traceback.format_exc()) + def register_message_handler(self, handler: Callable): """注册消息处理函数""" self.message_handlers.append(handler) @@ -45,23 +54,17 @@ class BaseMessageAPI: # logger.error(f"发送消息失败: {str(e)}") pass - async def message_process( - self, - ): - """启动消息处理""" - while True: - if len(self.cache) > 0: - for handler in self.message_handlers: - try: - await handler(self.cache[0]) - except Exception as e: - logger.error(str(e)) - logger.error(traceback.format_exc()) - self.cache.pop(0) - if len(self.cache) > 0: - await asyncio.sleep(0.1 / len(self.cache)) - else: - await asyncio.sleep(0.2) + async def process_single_message(self, message: Dict[str, Any]): + """处理单条消息""" + tasks = [] + for handler in self.message_handlers: + try: + tasks.append(handler(message)) + except Exception as e: + logger.error(str(e)) + logger.error(traceback.format_exc()) + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) def run_sync(self): """同步方式运行服务器"""