fix(chat): 优化线程环境下的同步数据库调用逻辑
重构了 `_sync_db_get` 函数,以解决在复杂线程和事件循环场景下可能出现的死锁和不稳定性问题。 旧的实现逻辑过于复杂,试图处理多种事件循环状态,容易出错。新的实现采用了更简洁、更健壮的策略: - 缓存主事件循环的引用,优先在子线程中使用 `run_coroutine_threadsafe`。 - 当无法使用主循环或在主线程中时,回退到使用 `asyncio.run()` 创建新循环来执行异步任务,这是一种更安全的模式。 - 增加了保护措施,避免在已运行的主事件循环中调用时造成阻塞。 此外,此提交还修复了日志记录器中可能添加空前缀导致格式异常的小问题。
This commit is contained in:
@@ -15,67 +15,67 @@ logger = get_logger("maibot_statistic")
|
|||||||
|
|
||||||
|
|
||||||
# 同步包装器函数,用于在非异步环境中调用异步数据库API
|
# 同步包装器函数,用于在非异步环境中调用异步数据库API
|
||||||
|
# 全局存储主事件循环引用
|
||||||
|
_main_event_loop = None
|
||||||
|
|
||||||
|
def _get_main_loop():
|
||||||
|
"""获取主事件循环的引用"""
|
||||||
|
global _main_event_loop
|
||||||
|
if _main_event_loop is None:
|
||||||
|
try:
|
||||||
|
_main_event_loop = asyncio.get_running_loop()
|
||||||
|
except RuntimeError:
|
||||||
|
# 如果没有运行的循环,尝试获取默认循环
|
||||||
|
try:
|
||||||
|
_main_event_loop = asyncio.get_event_loop_policy().get_event_loop()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return _main_event_loop
|
||||||
|
|
||||||
def _sync_db_get(model_class, filters=None, order_by=None, limit=None, single_result=False):
|
def _sync_db_get(model_class, filters=None, order_by=None, limit=None, single_result=False):
|
||||||
"""同步版本的db_get,用于在线程池中调用"""
|
"""同步版本的db_get,用于在线程池中调用"""
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
# sourcery skip: use-contextlib-suppress
|
|
||||||
"""
|
|
||||||
一个线程安全的、同步的db_get包装器。
|
|
||||||
用于从非异步的线程(如线程池)中安全地调用异步的db_get函数。
|
|
||||||
"""
|
|
||||||
import asyncio
|
|
||||||
from concurrent.futures import Future
|
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
main_loop = None
|
|
||||||
try:
|
try:
|
||||||
main_loop = asyncio.get_running_loop()
|
# 优先尝试获取预存的主事件循环
|
||||||
except RuntimeError:
|
main_loop = _get_main_loop()
|
||||||
# 如果在主线程中,但事件循环没有运行,就获取它
|
|
||||||
main_loop = asyncio.get_event_loop_policy().get_event_loop()
|
# 如果在子线程中且有主循环可用
|
||||||
|
if threading.current_thread() is not threading.main_thread() and main_loop:
|
||||||
# 如果当前线程不是主线程(即事件循环所在的线程)
|
try:
|
||||||
if threading.current_thread() is not threading.main_thread():
|
if not main_loop.is_closed():
|
||||||
future = asyncio.run_coroutine_threadsafe(
|
future = asyncio.run_coroutine_threadsafe(
|
||||||
db_get(model_class, filters, limit, order_by, single_result), main_loop
|
db_get(model_class, filters, limit, order_by, single_result), main_loop
|
||||||
)
|
)
|
||||||
try:
|
return future.result(timeout=30)
|
||||||
# 设置超时以防止永久阻塞
|
except Exception as e:
|
||||||
return future.result(timeout=30)
|
# 如果使用主循环失败,才在子线程创建新循环
|
||||||
except Exception as e:
|
logger.debug(f"使用主事件循环失败({e}),在子线程中创建新循环")
|
||||||
logger.error(f"在 _sync_db_get 的子线程中发生错误: {e}")
|
return asyncio.run(db_get(model_class, filters, limit, order_by, single_result))
|
||||||
return None
|
|
||||||
else:
|
# 如果在主线程中,直接运行
|
||||||
# 如果就在主线程,检查循环是否正在运行
|
if threading.current_thread() is threading.main_thread():
|
||||||
if main_loop.is_running():
|
try:
|
||||||
# 不应该在正在运行的循环上调用 run_until_complete
|
# 检查是否有当前运行的循环
|
||||||
# 这种情况很复杂,理论上不应该发生在一个设计良好的应用中
|
current_loop = asyncio.get_running_loop()
|
||||||
# 但如果发生了,我们尝试用 create_task 和同步等待的方式处理
|
if current_loop.is_running():
|
||||||
# 注意:这可能会导致死锁,如果主循环也在等待这个结果
|
# 主循环正在运行,返回空结果避免阻塞
|
||||||
logger.warning("在正在运行的主事件循环中同步调用了异步函数,这可能导致死锁。")
|
logger.debug("在运行中的主事件循环中跳过同步数据库查询")
|
||||||
future = Future()
|
return []
|
||||||
|
except RuntimeError:
|
||||||
async def task_wrapper():
|
# 没有运行的循环,可以安全创建
|
||||||
try:
|
pass
|
||||||
result = await db_get(model_class, filters, limit, order_by, single_result)
|
|
||||||
future.set_result(result)
|
|
||||||
except Exception as e_inner:
|
|
||||||
future.set_exception(e_inner)
|
|
||||||
|
|
||||||
asyncio.create_task(task_wrapper())
|
# 创建新循环运行查询
|
||||||
try:
|
return asyncio.run(db_get(model_class, filters, limit, order_by, single_result))
|
||||||
return future.result(timeout=30)
|
|
||||||
except Exception as e:
|
# 最后的兜底方案:在子线程创建新循环
|
||||||
logger.error(f"在 _sync_db_get 的主线程(运行中)中发生错误: {e}")
|
return asyncio.run(db_get(model_class, filters, limit, order_by, single_result))
|
||||||
return None
|
|
||||||
else:
|
except Exception as e:
|
||||||
# 如果主循环没有运行,我们可以安全地使用它来运行我们的任务
|
logger.error(f"_sync_db_get 执行过程中发生错误: {e}")
|
||||||
try:
|
return []
|
||||||
return main_loop.run_until_complete(db_get(model_class, filters, limit, order_by, single_result))
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"在 _sync_db_get 的主线程(未运行)中发生错误: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# 统计数据的键
|
# 统计数据的键
|
||||||
|
|||||||
@@ -815,7 +815,8 @@ class ModuleColoredConsoleRenderer:
|
|||||||
# 重新组合
|
# 重新组合
|
||||||
# parts.append(prefix_colored + thought_colored)
|
# parts.append(prefix_colored + thought_colored)
|
||||||
# 将前缀和思考内容作为独立的part添加,避免它们之间出现多余的空格
|
# 将前缀和思考内容作为独立的part添加,避免它们之间出现多余的空格
|
||||||
parts.append(prefix_colored)
|
if prefix_colored:
|
||||||
|
parts.append(prefix_colored)
|
||||||
parts.append(thought_colored)
|
parts.append(thought_colored)
|
||||||
|
|
||||||
elif module_color:
|
elif module_color:
|
||||||
|
|||||||
Reference in New Issue
Block a user