diff --git a/src/chat/utils/statistic.py b/src/chat/utils/statistic.py index 180bcb599..c03ae47cc 100644 --- a/src/chat/utils/statistic.py +++ b/src/chat/utils/statistic.py @@ -15,67 +15,67 @@ logger = get_logger("maibot_statistic") # 同步包装器函数,用于在非异步环境中调用异步数据库API +# 全局存储主事件循环引用 +_main_event_loop = None + +def _get_main_loop(): + """获取主事件循环的引用""" + global _main_event_loop + if _main_event_loop is None: + try: + _main_event_loop = asyncio.get_running_loop() + except RuntimeError: + # 如果没有运行的循环,尝试获取默认循环 + try: + _main_event_loop = asyncio.get_event_loop_policy().get_event_loop() + except Exception: + pass + return _main_event_loop + def _sync_db_get(model_class, filters=None, order_by=None, limit=None, single_result=False): """同步版本的db_get,用于在线程池中调用""" import asyncio - - # sourcery skip: use-contextlib-suppress - """ - 一个线程安全的、同步的db_get包装器。 - 用于从非异步的线程(如线程池)中安全地调用异步的db_get函数。 - """ - import asyncio - from concurrent.futures import Future import threading - - main_loop = None + try: - main_loop = asyncio.get_running_loop() - except RuntimeError: - # 如果在主线程中,但事件循环没有运行,就获取它 - main_loop = asyncio.get_event_loop_policy().get_event_loop() - - # 如果当前线程不是主线程(即事件循环所在的线程) - if threading.current_thread() is not threading.main_thread(): - future = asyncio.run_coroutine_threadsafe( - db_get(model_class, filters, limit, order_by, single_result), main_loop - ) - try: - # 设置超时以防止永久阻塞 - return future.result(timeout=30) - except Exception as e: - logger.error(f"在 _sync_db_get 的子线程中发生错误: {e}") - return None - else: - # 如果就在主线程,检查循环是否正在运行 - if main_loop.is_running(): - # 不应该在正在运行的循环上调用 run_until_complete - # 这种情况很复杂,理论上不应该发生在一个设计良好的应用中 - # 但如果发生了,我们尝试用 create_task 和同步等待的方式处理 - # 注意:这可能会导致死锁,如果主循环也在等待这个结果 - logger.warning("在正在运行的主事件循环中同步调用了异步函数,这可能导致死锁。") - future = Future() - - async def task_wrapper(): - try: - result = await db_get(model_class, filters, limit, order_by, single_result) - future.set_result(result) - except Exception as e_inner: - future.set_exception(e_inner) + # 优先尝试获取预存的主事件循环 + main_loop = _get_main_loop() + + # 如果在子线程中且有主循环可用 + if threading.current_thread() is not threading.main_thread() and main_loop: + try: + if not main_loop.is_closed(): + future = asyncio.run_coroutine_threadsafe( + db_get(model_class, filters, limit, order_by, single_result), main_loop + ) + return future.result(timeout=30) + except Exception as e: + # 如果使用主循环失败,才在子线程创建新循环 + logger.debug(f"使用主事件循环失败({e}),在子线程中创建新循环") + return asyncio.run(db_get(model_class, filters, limit, order_by, single_result)) + + # 如果在主线程中,直接运行 + if threading.current_thread() is threading.main_thread(): + try: + # 检查是否有当前运行的循环 + current_loop = asyncio.get_running_loop() + if current_loop.is_running(): + # 主循环正在运行,返回空结果避免阻塞 + logger.debug("在运行中的主事件循环中跳过同步数据库查询") + return [] + except RuntimeError: + # 没有运行的循环,可以安全创建 + pass - asyncio.create_task(task_wrapper()) - try: - return future.result(timeout=30) - except Exception as e: - logger.error(f"在 _sync_db_get 的主线程(运行中)中发生错误: {e}") - return None - else: - # 如果主循环没有运行,我们可以安全地使用它来运行我们的任务 - try: - return main_loop.run_until_complete(db_get(model_class, filters, limit, order_by, single_result)) - except Exception as e: - logger.error(f"在 _sync_db_get 的主线程(未运行)中发生错误: {e}") - return None + # 创建新循环运行查询 + return asyncio.run(db_get(model_class, filters, limit, order_by, single_result)) + + # 最后的兜底方案:在子线程创建新循环 + return asyncio.run(db_get(model_class, filters, limit, order_by, single_result)) + + except Exception as e: + logger.error(f"_sync_db_get 执行过程中发生错误: {e}") + return [] # 统计数据的键 diff --git a/src/common/logger.py b/src/common/logger.py index 7ceebfa6b..1c01e4a82 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -815,7 +815,8 @@ class ModuleColoredConsoleRenderer: # 重新组合 # parts.append(prefix_colored + thought_colored) # 将前缀和思考内容作为独立的part添加,避免它们之间出现多余的空格 - parts.append(prefix_colored) + if prefix_colored: + parts.append(prefix_colored) parts.append(thought_colored) elif module_color: