refactor(logging): 简化日志记录,移除冗余调试信息
delete(connection_pool): 移除连接池管理器相关代码
This commit is contained in:
@@ -129,16 +129,6 @@ class ChatStream:
|
|||||||
# 直接使用传入的 DatabaseMessages,设置到上下文中
|
# 直接使用传入的 DatabaseMessages,设置到上下文中
|
||||||
self.context.set_current_message(message)
|
self.context.set_current_message(message)
|
||||||
|
|
||||||
# 调试日志
|
|
||||||
logger.debug(
|
|
||||||
f"消息上下文已设置 - message_id: {message.message_id}, "
|
|
||||||
f"chat_id: {message.chat_id}, "
|
|
||||||
f"is_mentioned: {message.is_mentioned}, "
|
|
||||||
f"is_emoji: {message.is_emoji}, "
|
|
||||||
f"is_picid: {message.is_picid}, "
|
|
||||||
f"interest_value: {message.interest_value}"
|
|
||||||
)
|
|
||||||
|
|
||||||
def _safe_get_actions(self, message: DatabaseMessages) -> list | None:
|
def _safe_get_actions(self, message: DatabaseMessages) -> list | None:
|
||||||
"""安全获取消息的actions字段"""
|
"""安全获取消息的actions字段"""
|
||||||
import json
|
import json
|
||||||
|
|||||||
@@ -616,20 +616,20 @@ class StreamContext(BaseDataModel):
|
|||||||
# 如果没有指定类型要求,默认为支持
|
# 如果没有指定类型要求,默认为支持
|
||||||
return True
|
return True
|
||||||
|
|
||||||
logger.debug(f"[check_types] 检查消息是否支持类型: {types}")
|
# logger.debug(f"[check_types] 检查消息是否支持类型: {types}") # 简化日志,避免冗余
|
||||||
|
|
||||||
# 优先从additional_config中获取format_info
|
# 优先从additional_config中获取format_info
|
||||||
if hasattr(self.current_message, "additional_config") and self.current_message.additional_config:
|
if hasattr(self.current_message, "additional_config") and self.current_message.additional_config:
|
||||||
import orjson
|
import orjson
|
||||||
try:
|
try:
|
||||||
logger.debug(f"[check_types] additional_config 类型: {type(self.current_message.additional_config)}")
|
# logger.debug(f"[check_types] additional_config 类型: {type(self.current_message.additional_config)}") # 简化日志
|
||||||
config = orjson.loads(self.current_message.additional_config)
|
config = orjson.loads(self.current_message.additional_config)
|
||||||
logger.debug(f"[check_types] 解析后的 config 键: {config.keys() if isinstance(config, dict) else 'N/A'}")
|
# logger.debug(f"[check_types] 解析后的 config 键: {config.keys() if isinstance(config, dict) else 'N/A'}") # 简化日志
|
||||||
|
|
||||||
# 检查format_info结构
|
# 检查format_info结构
|
||||||
if "format_info" in config:
|
if "format_info" in config:
|
||||||
format_info = config["format_info"]
|
format_info = config["format_info"]
|
||||||
logger.debug(f"[check_types] 找到 format_info: {format_info}")
|
# logger.debug(f"[check_types] 找到 format_info: {format_info}") # 简化日志
|
||||||
|
|
||||||
# 方法1: 直接检查accept_format字段
|
# 方法1: 直接检查accept_format字段
|
||||||
if "accept_format" in format_info:
|
if "accept_format" in format_info:
|
||||||
@@ -646,9 +646,9 @@ class StreamContext(BaseDataModel):
|
|||||||
# 检查所有请求的类型是否都被支持
|
# 检查所有请求的类型是否都被支持
|
||||||
for requested_type in types:
|
for requested_type in types:
|
||||||
if requested_type not in accept_format:
|
if requested_type not in accept_format:
|
||||||
logger.debug(f"[check_types] 消息不支持类型 '{requested_type}',支持的类型: {accept_format}")
|
# logger.debug(f"[check_types] 消息不支持类型 '{requested_type}',支持的类型: {accept_format}") # 简化日志
|
||||||
return False
|
return False
|
||||||
logger.debug("[check_types] ✅ 消息支持所有请求的类型 (来自 accept_format)")
|
# logger.debug("[check_types] ✅ 消息支持所有请求的类型 (来自 accept_format)") # 简化日志
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# 方法2: 检查content_format字段(向后兼容)
|
# 方法2: 检查content_format字段(向后兼容)
|
||||||
@@ -665,9 +665,9 @@ class StreamContext(BaseDataModel):
|
|||||||
# 检查所有请求的类型是否都被支持
|
# 检查所有请求的类型是否都被支持
|
||||||
for requested_type in types:
|
for requested_type in types:
|
||||||
if requested_type not in content_format:
|
if requested_type not in content_format:
|
||||||
logger.debug(f"[check_types] 消息不支持类型 '{requested_type}',支持的内容格式: {content_format}")
|
# logger.debug(f"[check_types] 消息不支持类型 '{requested_type}',支持的内容格式: {content_format}") # 简化日志
|
||||||
return False
|
return False
|
||||||
logger.debug("[check_types] ✅ 消息支持所有请求的类型 (来自 content_format)")
|
# logger.debug("[check_types] ✅ 消息支持所有请求的类型 (来自 content_format)") # 简化日志
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
logger.warning("[check_types] [问题] additional_config 中没有 format_info 字段")
|
logger.warning("[check_types] [问题] additional_config 中没有 format_info 字段")
|
||||||
@@ -679,16 +679,16 @@ class StreamContext(BaseDataModel):
|
|||||||
|
|
||||||
# 备用方案:如果无法从additional_config获取格式信息,使用默认支持的类型
|
# 备用方案:如果无法从additional_config获取格式信息,使用默认支持的类型
|
||||||
# 大多数消息至少支持text类型
|
# 大多数消息至少支持text类型
|
||||||
logger.debug("[check_types] 使用备用方案:默认支持类型检查")
|
# logger.debug("[check_types] 使用备用方案:默认支持类型检查") # 简化日志
|
||||||
default_supported_types = ["text", "emoji"]
|
default_supported_types = ["text", "emoji"]
|
||||||
for requested_type in types:
|
for requested_type in types:
|
||||||
if requested_type not in default_supported_types:
|
if requested_type not in default_supported_types:
|
||||||
logger.debug(f"[check_types] 使用默认类型检查,消息可能不支持类型 '{requested_type}'")
|
# logger.debug(f"[check_types] 使用默认类型检查,消息可能不支持类型 '{requested_type}'") # 简化日志
|
||||||
# 对于非基础类型,返回False以避免错误
|
# 对于非基础类型,返回False以避免错误
|
||||||
if requested_type not in ["text", "emoji", "reply"]:
|
if requested_type not in ["text", "emoji", "reply"]:
|
||||||
logger.warning(f"[check_types] ❌ 备用方案拒绝类型 '{requested_type}'")
|
logger.warning(f"[check_types] ❌ 备用方案拒绝类型 '{requested_type}'")
|
||||||
return False
|
return False
|
||||||
logger.debug("[check_types] ✅ 备用方案通过所有类型检查")
|
# logger.debug("[check_types] ✅ 备用方案通过所有类型检查") # 简化日志
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# ==================== 消息缓存系统方法 ====================
|
# ==================== 消息缓存系统方法 ====================
|
||||||
@@ -736,7 +736,7 @@ class StreamContext(BaseDataModel):
|
|||||||
list[DatabaseMessages]: 刷新的消息列表
|
list[DatabaseMessages]: 刷新的消息列表
|
||||||
"""
|
"""
|
||||||
if not self.message_cache:
|
if not self.message_cache:
|
||||||
logger.debug(f"StreamContext {self.stream_id} 缓存为空,无需刷新")
|
# 缓存为空是正常情况,不需要记录日志
|
||||||
return []
|
return []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -1,281 +0,0 @@
|
|||||||
"""
|
|
||||||
透明连接复用管理器
|
|
||||||
在不改变原有API的情况下,实现数据库连接的智能复用
|
|
||||||
"""
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import time
|
|
||||||
from contextlib import asynccontextmanager
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from sqlalchemy import text
|
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
||||||
|
|
||||||
from src.common.logger import get_logger
|
|
||||||
|
|
||||||
logger = get_logger("connection_pool_manager")
|
|
||||||
|
|
||||||
|
|
||||||
class ConnectionInfo:
|
|
||||||
"""连接信息包装器"""
|
|
||||||
|
|
||||||
def __init__(self, session: AsyncSession, created_at: float):
|
|
||||||
self.session = session
|
|
||||||
self.created_at = created_at
|
|
||||||
self.last_used = created_at
|
|
||||||
self.in_use = False
|
|
||||||
self.ref_count = 0
|
|
||||||
|
|
||||||
def mark_used(self):
|
|
||||||
"""标记连接被使用"""
|
|
||||||
self.last_used = time.time()
|
|
||||||
self.in_use = True
|
|
||||||
self.ref_count += 1
|
|
||||||
|
|
||||||
def mark_released(self):
|
|
||||||
"""标记连接被释放"""
|
|
||||||
self.in_use = False
|
|
||||||
self.ref_count = max(0, self.ref_count - 1)
|
|
||||||
|
|
||||||
def is_expired(self, max_lifetime: float = 300.0, max_idle: float = 60.0) -> bool:
|
|
||||||
"""检查连接是否过期"""
|
|
||||||
current_time = time.time()
|
|
||||||
|
|
||||||
# 检查总生命周期
|
|
||||||
if current_time - self.created_at > max_lifetime:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# 检查空闲时间
|
|
||||||
if not self.in_use and current_time - self.last_used > max_idle:
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def close(self):
|
|
||||||
"""关闭连接"""
|
|
||||||
try:
|
|
||||||
# 使用 shield 保护 close 操作,确保即使任务被取消也能完成关闭
|
|
||||||
# 通过 `cast` 明确告知类型检查器 `shield` 的返回类型,避免类型错误
|
|
||||||
from typing import cast
|
|
||||||
await cast(asyncio.Future, asyncio.shield(self.session.close()))
|
|
||||||
logger.debug("连接已关闭")
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
# 这是一个预期的行为,例如在流式聊天中断时
|
|
||||||
logger.debug("关闭连接时任务被取消")
|
|
||||||
# 重新抛出异常以确保任务状态正确
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"关闭连接时出错: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
class ConnectionPoolManager:
|
|
||||||
"""透明的连接池管理器"""
|
|
||||||
|
|
||||||
def __init__(self, max_pool_size: int = 10, max_lifetime: float = 300.0, max_idle: float = 60.0):
|
|
||||||
self.max_pool_size = max_pool_size
|
|
||||||
self.max_lifetime = max_lifetime
|
|
||||||
self.max_idle = max_idle
|
|
||||||
|
|
||||||
# 连接池
|
|
||||||
self._connections: set[ConnectionInfo] = set()
|
|
||||||
self._lock = asyncio.Lock()
|
|
||||||
|
|
||||||
# 统计信息
|
|
||||||
self._stats = {
|
|
||||||
"total_created": 0,
|
|
||||||
"total_reused": 0,
|
|
||||||
"total_expired": 0,
|
|
||||||
"active_connections": 0,
|
|
||||||
"pool_hits": 0,
|
|
||||||
"pool_misses": 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
# 后台清理任务
|
|
||||||
self._cleanup_task: asyncio.Task | None = None
|
|
||||||
self._should_cleanup = False
|
|
||||||
|
|
||||||
logger.info(f"连接池管理器初始化完成 (最大池大小: {max_pool_size})")
|
|
||||||
|
|
||||||
async def start(self):
|
|
||||||
"""启动连接池管理器"""
|
|
||||||
if self._cleanup_task is None:
|
|
||||||
self._should_cleanup = True
|
|
||||||
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
|
|
||||||
logger.info("连接池管理器已启动")
|
|
||||||
|
|
||||||
async def stop(self):
|
|
||||||
"""停止连接池管理器"""
|
|
||||||
self._should_cleanup = False
|
|
||||||
|
|
||||||
if self._cleanup_task:
|
|
||||||
self._cleanup_task.cancel()
|
|
||||||
try:
|
|
||||||
await self._cleanup_task
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
self._cleanup_task = None
|
|
||||||
|
|
||||||
# 关闭所有连接
|
|
||||||
await self._close_all_connections()
|
|
||||||
logger.info("连接池管理器已停止")
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def get_session(self, session_factory: async_sessionmaker[AsyncSession]):
|
|
||||||
"""
|
|
||||||
获取数据库会话的透明包装器
|
|
||||||
如果有可用连接则复用,否则创建新连接
|
|
||||||
"""
|
|
||||||
connection_info = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
# 尝试获取现有连接
|
|
||||||
connection_info = await self._get_reusable_connection(session_factory)
|
|
||||||
|
|
||||||
if connection_info:
|
|
||||||
# 复用现有连接
|
|
||||||
connection_info.mark_used()
|
|
||||||
self._stats["total_reused"] += 1
|
|
||||||
self._stats["pool_hits"] += 1
|
|
||||||
logger.debug(f"复用现有连接 (活跃连接数: {len(self._connections)})")
|
|
||||||
else:
|
|
||||||
# 创建新连接
|
|
||||||
session = session_factory()
|
|
||||||
connection_info = ConnectionInfo(session, time.time())
|
|
||||||
|
|
||||||
async with self._lock:
|
|
||||||
self._connections.add(connection_info)
|
|
||||||
|
|
||||||
connection_info.mark_used()
|
|
||||||
self._stats["total_created"] += 1
|
|
||||||
self._stats["pool_misses"] += 1
|
|
||||||
logger.debug(f"创建新连接 (活跃连接数: {len(self._connections)})")
|
|
||||||
|
|
||||||
yield connection_info.session
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
# 发生错误时回滚连接
|
|
||||||
if connection_info and connection_info.session:
|
|
||||||
try:
|
|
||||||
await connection_info.session.rollback()
|
|
||||||
except Exception as rollback_error:
|
|
||||||
logger.warning(f"回滚连接时出错: {rollback_error}")
|
|
||||||
raise
|
|
||||||
finally:
|
|
||||||
# 释放连接回池中
|
|
||||||
if connection_info:
|
|
||||||
connection_info.mark_released()
|
|
||||||
|
|
||||||
async def _get_reusable_connection(
|
|
||||||
self, session_factory: async_sessionmaker[AsyncSession]
|
|
||||||
) -> ConnectionInfo | None:
|
|
||||||
"""获取可复用的连接"""
|
|
||||||
# 导入方言适配器获取 ping 查询
|
|
||||||
from src.common.database.core.dialect_adapter import DialectAdapter
|
|
||||||
|
|
||||||
ping_query = DialectAdapter.get_ping_query()
|
|
||||||
|
|
||||||
async with self._lock:
|
|
||||||
# 清理过期连接
|
|
||||||
await self._cleanup_expired_connections_locked()
|
|
||||||
|
|
||||||
# 查找可复用的连接
|
|
||||||
for connection_info in list(self._connections):
|
|
||||||
if not connection_info.in_use and not connection_info.is_expired(self.max_lifetime, self.max_idle):
|
|
||||||
# 验证连接是否仍然有效
|
|
||||||
try:
|
|
||||||
# 执行 ping 查询来验证连接
|
|
||||||
await connection_info.session.execute(text(ping_query))
|
|
||||||
return connection_info
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"连接验证失败,将移除: {e}")
|
|
||||||
await connection_info.close()
|
|
||||||
self._connections.remove(connection_info)
|
|
||||||
self._stats["total_expired"] += 1
|
|
||||||
|
|
||||||
# 检查是否可以创建新连接
|
|
||||||
if len(self._connections) >= self.max_pool_size:
|
|
||||||
logger.warning(f"连接池已满 ({len(self._connections)}/{self.max_pool_size}),等待复用")
|
|
||||||
return None
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
async def _cleanup_expired_connections_locked(self):
|
|
||||||
"""清理过期连接(需要在锁内调用)"""
|
|
||||||
time.time()
|
|
||||||
expired_connections = [
|
|
||||||
connection_info for connection_info in list(self._connections)
|
|
||||||
if connection_info.is_expired(self.max_lifetime, self.max_idle) and not connection_info.in_use
|
|
||||||
]
|
|
||||||
|
|
||||||
for connection_info in expired_connections:
|
|
||||||
await connection_info.close()
|
|
||||||
self._connections.remove(connection_info)
|
|
||||||
self._stats["total_expired"] += 1
|
|
||||||
|
|
||||||
if expired_connections:
|
|
||||||
logger.debug(f"清理了 {len(expired_connections)} 个过期连接")
|
|
||||||
|
|
||||||
async def _cleanup_loop(self):
|
|
||||||
"""后台清理循环"""
|
|
||||||
while self._should_cleanup:
|
|
||||||
try:
|
|
||||||
await asyncio.sleep(30.0) # 每30秒清理一次
|
|
||||||
|
|
||||||
async with self._lock:
|
|
||||||
await self._cleanup_expired_connections_locked()
|
|
||||||
|
|
||||||
# 更新统计信息
|
|
||||||
self._stats["active_connections"] = len(self._connections)
|
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
break
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"连接池清理循环出错: {e}")
|
|
||||||
await asyncio.sleep(10.0)
|
|
||||||
|
|
||||||
async def _close_all_connections(self):
|
|
||||||
"""关闭所有连接"""
|
|
||||||
async with self._lock:
|
|
||||||
for connection_info in list(self._connections):
|
|
||||||
await connection_info.close()
|
|
||||||
|
|
||||||
self._connections.clear()
|
|
||||||
logger.info("所有连接已关闭")
|
|
||||||
|
|
||||||
def get_stats(self) -> dict[str, Any]:
|
|
||||||
"""获取连接池统计信息"""
|
|
||||||
return {
|
|
||||||
**self._stats,
|
|
||||||
"active_connections": len(self._connections),
|
|
||||||
"max_pool_size": self.max_pool_size,
|
|
||||||
"pool_efficiency": (
|
|
||||||
self._stats["pool_hits"] / max(1, self._stats["pool_hits"] + self._stats["pool_misses"])
|
|
||||||
)
|
|
||||||
* 100,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
# 全局连接池管理器实例
|
|
||||||
_connection_pool_manager: ConnectionPoolManager | None = None
|
|
||||||
|
|
||||||
|
|
||||||
def get_connection_pool_manager() -> ConnectionPoolManager:
|
|
||||||
"""获取全局连接池管理器实例"""
|
|
||||||
global _connection_pool_manager
|
|
||||||
if _connection_pool_manager is None:
|
|
||||||
_connection_pool_manager = ConnectionPoolManager()
|
|
||||||
return _connection_pool_manager
|
|
||||||
|
|
||||||
|
|
||||||
async def start_connection_pool():
|
|
||||||
"""启动连接池"""
|
|
||||||
manager = get_connection_pool_manager()
|
|
||||||
await manager.start()
|
|
||||||
|
|
||||||
|
|
||||||
async def stop_connection_pool():
|
|
||||||
"""停止连接池"""
|
|
||||||
global _connection_pool_manager
|
|
||||||
if _connection_pool_manager:
|
|
||||||
await _connection_pool_manager.stop()
|
|
||||||
_connection_pool_manager = None
|
|
||||||
@@ -87,7 +87,7 @@ async def _apply_session_settings(session: AsyncSession, db_type: str) -> None:
|
|||||||
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
|
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
|
||||||
"""获取数据库会话上下文管理器
|
"""获取数据库会话上下文管理器
|
||||||
|
|
||||||
这是数据库操作的主要入口点,通过连接池管理器提供透明的连接复用。
|
这是数据库操作的主要入口点,直接从会话工厂获取独立会话。
|
||||||
|
|
||||||
支持的数据库:
|
支持的数据库:
|
||||||
- SQLite: 自动设置 busy_timeout 和外键约束
|
- SQLite: 自动设置 busy_timeout 和外键约束
|
||||||
@@ -101,20 +101,7 @@ async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
|
|||||||
Yields:
|
Yields:
|
||||||
AsyncSession: SQLAlchemy异步会话对象
|
AsyncSession: SQLAlchemy异步会话对象
|
||||||
"""
|
"""
|
||||||
# 延迟导入避免循环依赖
|
async with get_db_session_direct() as session:
|
||||||
from ..optimization.connection_pool import get_connection_pool_manager
|
|
||||||
|
|
||||||
session_factory = await get_session_factory()
|
|
||||||
pool_manager = get_connection_pool_manager()
|
|
||||||
|
|
||||||
# 使用连接池管理器(透明复用连接)
|
|
||||||
async with pool_manager.get_session(session_factory) as session:
|
|
||||||
# 获取数据库类型并应用特定设置
|
|
||||||
from src.config.config import global_config
|
|
||||||
|
|
||||||
assert global_config is not None
|
|
||||||
await _apply_session_settings(session, global_config.database.database_type)
|
|
||||||
|
|
||||||
yield session
|
yield session
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
"""数据库优化层
|
"""数据库优化层
|
||||||
|
|
||||||
职责:
|
职责:
|
||||||
- 连接池管理
|
|
||||||
- 批量调度
|
- 批量调度
|
||||||
- 多级缓存
|
- 多级缓存
|
||||||
- 数据预加载
|
- 数据预加载
|
||||||
@@ -23,12 +22,6 @@ from .cache_manager import (
|
|||||||
close_cache,
|
close_cache,
|
||||||
get_cache,
|
get_cache,
|
||||||
)
|
)
|
||||||
from .connection_pool import (
|
|
||||||
ConnectionPoolManager,
|
|
||||||
get_connection_pool_manager,
|
|
||||||
start_connection_pool,
|
|
||||||
stop_connection_pool,
|
|
||||||
)
|
|
||||||
from .preloader import (
|
from .preloader import (
|
||||||
AccessPattern,
|
AccessPattern,
|
||||||
CommonDataPreloader,
|
CommonDataPreloader,
|
||||||
@@ -46,8 +39,6 @@ __all__ = [
|
|||||||
"CacheEntry",
|
"CacheEntry",
|
||||||
"CacheStats",
|
"CacheStats",
|
||||||
"CommonDataPreloader",
|
"CommonDataPreloader",
|
||||||
# Connection Pool
|
|
||||||
"ConnectionPoolManager",
|
|
||||||
# Preloader
|
# Preloader
|
||||||
"DataPreloader",
|
"DataPreloader",
|
||||||
"LRUCache",
|
"LRUCache",
|
||||||
@@ -59,8 +50,5 @@ __all__ = [
|
|||||||
"close_preloader",
|
"close_preloader",
|
||||||
"get_batch_scheduler",
|
"get_batch_scheduler",
|
||||||
"get_cache",
|
"get_cache",
|
||||||
"get_connection_pool_manager",
|
|
||||||
"get_preloader",
|
"get_preloader",
|
||||||
"start_connection_pool",
|
|
||||||
"stop_connection_pool",
|
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -304,13 +304,11 @@ class MultiLevelCache:
|
|||||||
# 1. 尝试从L1获取
|
# 1. 尝试从L1获取
|
||||||
value = await self.l1_cache.get(key)
|
value = await self.l1_cache.get(key)
|
||||||
if value is not None:
|
if value is not None:
|
||||||
logger.debug(f"L1缓存命中: {key}")
|
|
||||||
return value
|
return value
|
||||||
|
|
||||||
# 2. 尝试从L2获取
|
# 2. 尝试从L2获取
|
||||||
value = await self.l2_cache.get(key)
|
value = await self.l2_cache.get(key)
|
||||||
if value is not None:
|
if value is not None:
|
||||||
logger.debug(f"L2缓存命中: {key}")
|
|
||||||
# 提升到L1
|
# 提升到L1
|
||||||
await self.l1_cache.set(key, value)
|
await self.l1_cache.set(key, value)
|
||||||
return value
|
return value
|
||||||
|
|||||||
@@ -1,299 +0,0 @@
|
|||||||
"""
|
|
||||||
透明连接复用管理器
|
|
||||||
|
|
||||||
在不改变原有API的情况下,实现数据库连接的智能复用
|
|
||||||
"""
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import time
|
|
||||||
from contextlib import asynccontextmanager
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from sqlalchemy import text
|
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
||||||
|
|
||||||
from src.common.logger import get_logger
|
|
||||||
|
|
||||||
logger = get_logger("database.connection_pool")
|
|
||||||
|
|
||||||
|
|
||||||
class ConnectionInfo:
|
|
||||||
"""连接信息包装器"""
|
|
||||||
|
|
||||||
def __init__(self, session: AsyncSession, created_at: float):
|
|
||||||
self.session = session
|
|
||||||
self.created_at = created_at
|
|
||||||
self.last_used = created_at
|
|
||||||
self.in_use = False
|
|
||||||
self.ref_count = 0
|
|
||||||
|
|
||||||
def mark_used(self):
|
|
||||||
"""标记连接被使用"""
|
|
||||||
self.last_used = time.time()
|
|
||||||
self.in_use = True
|
|
||||||
self.ref_count += 1
|
|
||||||
|
|
||||||
def mark_released(self):
|
|
||||||
"""标记连接被释放"""
|
|
||||||
self.in_use = False
|
|
||||||
self.ref_count = max(0, self.ref_count - 1)
|
|
||||||
|
|
||||||
def is_expired(self, max_lifetime: float = 300.0, max_idle: float = 60.0) -> bool:
|
|
||||||
"""检查连接是否过期"""
|
|
||||||
current_time = time.time()
|
|
||||||
|
|
||||||
# 检查总生命周期
|
|
||||||
if current_time - self.created_at > max_lifetime:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# 检查空闲时间
|
|
||||||
if not self.in_use and current_time - self.last_used > max_idle:
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def close(self):
|
|
||||||
"""关闭连接"""
|
|
||||||
try:
|
|
||||||
# 使用 shield 保护 close 操作,确保即使任务被取消也能完成关闭
|
|
||||||
from typing import cast
|
|
||||||
await cast(asyncio.Future, asyncio.shield(self.session.close()))
|
|
||||||
logger.debug("连接已关闭")
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
# 这是一个预期的行为,例如在流式聊天中断时
|
|
||||||
logger.debug("关闭连接时任务被取消")
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"关闭连接时出错: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
class ConnectionPoolManager:
|
|
||||||
"""透明的连接池管理器"""
|
|
||||||
|
|
||||||
def __init__(self, max_pool_size: int = 10, max_lifetime: float = 300.0, max_idle: float = 60.0):
|
|
||||||
self.max_pool_size = max_pool_size
|
|
||||||
self.max_lifetime = max_lifetime
|
|
||||||
self.max_idle = max_idle
|
|
||||||
|
|
||||||
# 连接池
|
|
||||||
self._connections: set[ConnectionInfo] = set()
|
|
||||||
self._lock = asyncio.Lock()
|
|
||||||
|
|
||||||
# 统计信息
|
|
||||||
self._stats = {
|
|
||||||
"total_created": 0,
|
|
||||||
"total_reused": 0,
|
|
||||||
"total_expired": 0,
|
|
||||||
"active_connections": 0,
|
|
||||||
"pool_hits": 0,
|
|
||||||
"pool_misses": 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
# 后台清理任务
|
|
||||||
self._cleanup_task: asyncio.Task | None = None
|
|
||||||
self._should_cleanup = False
|
|
||||||
|
|
||||||
logger.info(f"连接池管理器初始化完成 (最大池大小: {max_pool_size})")
|
|
||||||
|
|
||||||
async def start(self):
|
|
||||||
"""启动连接池管理器"""
|
|
||||||
if self._cleanup_task is None:
|
|
||||||
self._should_cleanup = True
|
|
||||||
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
|
|
||||||
logger.info("✅ 连接池管理器已启动")
|
|
||||||
|
|
||||||
async def stop(self):
|
|
||||||
"""停止连接池管理器"""
|
|
||||||
self._should_cleanup = False
|
|
||||||
|
|
||||||
if self._cleanup_task:
|
|
||||||
self._cleanup_task.cancel()
|
|
||||||
try:
|
|
||||||
await self._cleanup_task
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
self._cleanup_task = None
|
|
||||||
|
|
||||||
# 关闭所有连接
|
|
||||||
await self._close_all_connections()
|
|
||||||
logger.info("✅ 连接池管理器已停止")
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def get_session(self, session_factory: async_sessionmaker[AsyncSession]):
|
|
||||||
"""
|
|
||||||
获取数据库会话的透明包装器
|
|
||||||
如果有可用连接则复用,否则创建新连接
|
|
||||||
|
|
||||||
事务管理说明:
|
|
||||||
- 正常退出时自动提交事务
|
|
||||||
- 发生异常时自动回滚事务
|
|
||||||
- 如果用户代码已手动调用 commit/rollback,再次调用是安全的(空操作)
|
|
||||||
- 支持所有数据库类型:SQLite、PostgreSQL
|
|
||||||
"""
|
|
||||||
connection_info = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
# 尝试获取现有连接
|
|
||||||
connection_info = await self._get_reusable_connection(session_factory)
|
|
||||||
|
|
||||||
if connection_info:
|
|
||||||
# 复用现有连接
|
|
||||||
connection_info.mark_used()
|
|
||||||
self._stats["total_reused"] += 1
|
|
||||||
self._stats["pool_hits"] += 1
|
|
||||||
logger.debug(f"♻️ 复用连接 (池大小: {len(self._connections)})")
|
|
||||||
else:
|
|
||||||
# 创建新连接
|
|
||||||
session = session_factory()
|
|
||||||
connection_info = ConnectionInfo(session, time.time())
|
|
||||||
|
|
||||||
async with self._lock:
|
|
||||||
self._connections.add(connection_info)
|
|
||||||
|
|
||||||
connection_info.mark_used()
|
|
||||||
self._stats["total_created"] += 1
|
|
||||||
self._stats["pool_misses"] += 1
|
|
||||||
logger.debug(f"🆕 创建连接 (池大小: {len(self._connections)})")
|
|
||||||
|
|
||||||
yield connection_info.session
|
|
||||||
|
|
||||||
# 🔧 正常退出时提交事务
|
|
||||||
# 这对所有数据库(SQLite、PostgreSQL)都很重要
|
|
||||||
# 因为 SQLAlchemy 默认使用事务模式,不会自动提交
|
|
||||||
# 注意:如果用户代码已调用 commit(),这里的 commit() 是安全的空操作
|
|
||||||
if connection_info and connection_info.session:
|
|
||||||
try:
|
|
||||||
# 检查事务是否处于活动状态,避免在已回滚的事务上提交
|
|
||||||
if connection_info.session.is_active:
|
|
||||||
await connection_info.session.commit()
|
|
||||||
except Exception as commit_error:
|
|
||||||
logger.warning(f"提交事务时出错: {commit_error}")
|
|
||||||
try:
|
|
||||||
await connection_info.session.rollback()
|
|
||||||
except Exception:
|
|
||||||
pass # 忽略回滚错误,因为事务可能已经结束
|
|
||||||
raise
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
# 发生错误时回滚连接
|
|
||||||
if connection_info and connection_info.session:
|
|
||||||
try:
|
|
||||||
# 检查是否需要回滚(事务是否活动)
|
|
||||||
if connection_info.session.is_active:
|
|
||||||
await connection_info.session.rollback()
|
|
||||||
except Exception as rollback_error:
|
|
||||||
logger.warning(f"回滚连接时出错: {rollback_error}")
|
|
||||||
raise
|
|
||||||
finally:
|
|
||||||
# 释放连接回池中
|
|
||||||
if connection_info:
|
|
||||||
connection_info.mark_released()
|
|
||||||
|
|
||||||
async def _get_reusable_connection(
|
|
||||||
self, session_factory: async_sessionmaker[AsyncSession]
|
|
||||||
) -> ConnectionInfo | None:
|
|
||||||
"""获取可复用的连接"""
|
|
||||||
async with self._lock:
|
|
||||||
# 清理过期连接
|
|
||||||
await self._cleanup_expired_connections_locked()
|
|
||||||
|
|
||||||
# 查找可复用的连接
|
|
||||||
for connection_info in list(self._connections):
|
|
||||||
if not connection_info.in_use and not connection_info.is_expired(self.max_lifetime, self.max_idle):
|
|
||||||
# 验证连接是否仍然有效
|
|
||||||
try:
|
|
||||||
# 执行一个简单的查询来验证连接
|
|
||||||
await connection_info.session.execute(text("SELECT 1"))
|
|
||||||
return connection_info
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"连接验证失败,将移除: {e}")
|
|
||||||
await connection_info.close()
|
|
||||||
self._connections.remove(connection_info)
|
|
||||||
self._stats["total_expired"] += 1
|
|
||||||
|
|
||||||
# 检查是否可以创建新连接
|
|
||||||
if len(self._connections) >= self.max_pool_size:
|
|
||||||
logger.warning(f"⚠️ 连接池已满 ({len(self._connections)}/{self.max_pool_size})")
|
|
||||||
return None
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
async def _cleanup_expired_connections_locked(self):
|
|
||||||
"""清理过期连接(需要在锁内调用)"""
|
|
||||||
expired_connections = [
|
|
||||||
connection_info for connection_info in list(self._connections)
|
|
||||||
if connection_info.is_expired(self.max_lifetime, self.max_idle) and not connection_info.in_use
|
|
||||||
]
|
|
||||||
|
|
||||||
for connection_info in expired_connections:
|
|
||||||
await connection_info.close()
|
|
||||||
self._connections.remove(connection_info)
|
|
||||||
self._stats["total_expired"] += 1
|
|
||||||
|
|
||||||
if expired_connections:
|
|
||||||
logger.debug(f"🧹 清理了 {len(expired_connections)} 个过期连接")
|
|
||||||
|
|
||||||
async def _cleanup_loop(self):
|
|
||||||
"""后台清理循环"""
|
|
||||||
while self._should_cleanup:
|
|
||||||
try:
|
|
||||||
await asyncio.sleep(30.0) # 每30秒清理一次
|
|
||||||
|
|
||||||
async with self._lock:
|
|
||||||
await self._cleanup_expired_connections_locked()
|
|
||||||
|
|
||||||
# 更新统计信息
|
|
||||||
self._stats["active_connections"] = len(self._connections)
|
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
break
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"连接池清理循环出错: {e}")
|
|
||||||
await asyncio.sleep(10.0)
|
|
||||||
|
|
||||||
async def _close_all_connections(self):
|
|
||||||
"""关闭所有连接"""
|
|
||||||
async with self._lock:
|
|
||||||
for connection_info in list(self._connections):
|
|
||||||
await connection_info.close()
|
|
||||||
|
|
||||||
self._connections.clear()
|
|
||||||
logger.info("所有连接已关闭")
|
|
||||||
|
|
||||||
def get_stats(self) -> dict[str, Any]:
|
|
||||||
"""获取连接池统计信息"""
|
|
||||||
total_requests = self._stats["pool_hits"] + self._stats["pool_misses"]
|
|
||||||
pool_efficiency = (self._stats["pool_hits"] / max(1, total_requests)) * 100 if total_requests > 0 else 0
|
|
||||||
|
|
||||||
return {
|
|
||||||
**self._stats,
|
|
||||||
"active_connections": len(self._connections),
|
|
||||||
"max_pool_size": self.max_pool_size,
|
|
||||||
"pool_efficiency": f"{pool_efficiency:.2f}%",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
# 全局连接池管理器实例
|
|
||||||
_connection_pool_manager: ConnectionPoolManager | None = None
|
|
||||||
|
|
||||||
|
|
||||||
def get_connection_pool_manager() -> ConnectionPoolManager:
|
|
||||||
"""获取全局连接池管理器实例"""
|
|
||||||
global _connection_pool_manager
|
|
||||||
if _connection_pool_manager is None:
|
|
||||||
_connection_pool_manager = ConnectionPoolManager()
|
|
||||||
return _connection_pool_manager
|
|
||||||
|
|
||||||
|
|
||||||
async def start_connection_pool():
|
|
||||||
"""启动连接池"""
|
|
||||||
manager = get_connection_pool_manager()
|
|
||||||
await manager.start()
|
|
||||||
|
|
||||||
|
|
||||||
async def stop_connection_pool():
|
|
||||||
"""停止连接池"""
|
|
||||||
global _connection_pool_manager
|
|
||||||
if _connection_pool_manager:
|
|
||||||
await _connection_pool_manager.stop()
|
|
||||||
_connection_pool_manager = None
|
|
||||||
Reference in New Issue
Block a user