diff --git a/src/chat/message_receive/chat_stream.py b/src/chat/message_receive/chat_stream.py index b3d61a331..aee356156 100644 --- a/src/chat/message_receive/chat_stream.py +++ b/src/chat/message_receive/chat_stream.py @@ -129,16 +129,6 @@ class ChatStream: # 直接使用传入的 DatabaseMessages,设置到上下文中 self.context.set_current_message(message) - # 调试日志 - logger.debug( - f"消息上下文已设置 - message_id: {message.message_id}, " - f"chat_id: {message.chat_id}, " - f"is_mentioned: {message.is_mentioned}, " - f"is_emoji: {message.is_emoji}, " - f"is_picid: {message.is_picid}, " - f"interest_value: {message.interest_value}" - ) - def _safe_get_actions(self, message: DatabaseMessages) -> list | None: """安全获取消息的actions字段""" import json diff --git a/src/common/data_models/message_manager_data_model.py b/src/common/data_models/message_manager_data_model.py index 4c4955cef..cd91441da 100644 --- a/src/common/data_models/message_manager_data_model.py +++ b/src/common/data_models/message_manager_data_model.py @@ -616,20 +616,20 @@ class StreamContext(BaseDataModel): # 如果没有指定类型要求,默认为支持 return True - logger.debug(f"[check_types] 检查消息是否支持类型: {types}") + # logger.debug(f"[check_types] 检查消息是否支持类型: {types}") # 简化日志,避免冗余 # 优先从additional_config中获取format_info if hasattr(self.current_message, "additional_config") and self.current_message.additional_config: import orjson try: - logger.debug(f"[check_types] additional_config 类型: {type(self.current_message.additional_config)}") + # logger.debug(f"[check_types] additional_config 类型: {type(self.current_message.additional_config)}") # 简化日志 config = orjson.loads(self.current_message.additional_config) - logger.debug(f"[check_types] 解析后的 config 键: {config.keys() if isinstance(config, dict) else 'N/A'}") + # logger.debug(f"[check_types] 解析后的 config 键: {config.keys() if isinstance(config, dict) else 'N/A'}") # 简化日志 # 检查format_info结构 if "format_info" in config: format_info = config["format_info"] - logger.debug(f"[check_types] 找到 format_info: {format_info}") + # logger.debug(f"[check_types] 找到 format_info: {format_info}") # 简化日志 # 方法1: 直接检查accept_format字段 if "accept_format" in format_info: @@ -646,9 +646,9 @@ class StreamContext(BaseDataModel): # 检查所有请求的类型是否都被支持 for requested_type in types: if requested_type not in accept_format: - logger.debug(f"[check_types] 消息不支持类型 '{requested_type}',支持的类型: {accept_format}") + # logger.debug(f"[check_types] 消息不支持类型 '{requested_type}',支持的类型: {accept_format}") # 简化日志 return False - logger.debug("[check_types] ✅ 消息支持所有请求的类型 (来自 accept_format)") + # logger.debug("[check_types] ✅ 消息支持所有请求的类型 (来自 accept_format)") # 简化日志 return True # 方法2: 检查content_format字段(向后兼容) @@ -665,9 +665,9 @@ class StreamContext(BaseDataModel): # 检查所有请求的类型是否都被支持 for requested_type in types: if requested_type not in content_format: - logger.debug(f"[check_types] 消息不支持类型 '{requested_type}',支持的内容格式: {content_format}") + # logger.debug(f"[check_types] 消息不支持类型 '{requested_type}',支持的内容格式: {content_format}") # 简化日志 return False - logger.debug("[check_types] ✅ 消息支持所有请求的类型 (来自 content_format)") + # logger.debug("[check_types] ✅ 消息支持所有请求的类型 (来自 content_format)") # 简化日志 return True else: logger.warning("[check_types] [问题] additional_config 中没有 format_info 字段") @@ -679,16 +679,16 @@ class StreamContext(BaseDataModel): # 备用方案:如果无法从additional_config获取格式信息,使用默认支持的类型 # 大多数消息至少支持text类型 - logger.debug("[check_types] 使用备用方案:默认支持类型检查") + # logger.debug("[check_types] 使用备用方案:默认支持类型检查") # 简化日志 default_supported_types = ["text", "emoji"] for requested_type in types: if requested_type not in default_supported_types: - logger.debug(f"[check_types] 使用默认类型检查,消息可能不支持类型 '{requested_type}'") + # logger.debug(f"[check_types] 使用默认类型检查,消息可能不支持类型 '{requested_type}'") # 简化日志 # 对于非基础类型,返回False以避免错误 if requested_type not in ["text", "emoji", "reply"]: logger.warning(f"[check_types] ❌ 备用方案拒绝类型 '{requested_type}'") return False - logger.debug("[check_types] ✅ 备用方案通过所有类型检查") + # logger.debug("[check_types] ✅ 备用方案通过所有类型检查") # 简化日志 return True # ==================== 消息缓存系统方法 ==================== @@ -736,7 +736,7 @@ class StreamContext(BaseDataModel): list[DatabaseMessages]: 刷新的消息列表 """ if not self.message_cache: - logger.debug(f"StreamContext {self.stream_id} 缓存为空,无需刷新") + # 缓存为空是正常情况,不需要记录日志 return [] try: diff --git a/src/common/database/connection_pool_manager.py b/src/common/database/connection_pool_manager.py deleted file mode 100644 index 73fd5335c..000000000 --- a/src/common/database/connection_pool_manager.py +++ /dev/null @@ -1,281 +0,0 @@ -""" -透明连接复用管理器 -在不改变原有API的情况下,实现数据库连接的智能复用 -""" - -import asyncio -import time -from contextlib import asynccontextmanager -from typing import Any - -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker - -from src.common.logger import get_logger - -logger = get_logger("connection_pool_manager") - - -class ConnectionInfo: - """连接信息包装器""" - - def __init__(self, session: AsyncSession, created_at: float): - self.session = session - self.created_at = created_at - self.last_used = created_at - self.in_use = False - self.ref_count = 0 - - def mark_used(self): - """标记连接被使用""" - self.last_used = time.time() - self.in_use = True - self.ref_count += 1 - - def mark_released(self): - """标记连接被释放""" - self.in_use = False - self.ref_count = max(0, self.ref_count - 1) - - def is_expired(self, max_lifetime: float = 300.0, max_idle: float = 60.0) -> bool: - """检查连接是否过期""" - current_time = time.time() - - # 检查总生命周期 - if current_time - self.created_at > max_lifetime: - return True - - # 检查空闲时间 - if not self.in_use and current_time - self.last_used > max_idle: - return True - - return False - - async def close(self): - """关闭连接""" - try: - # 使用 shield 保护 close 操作,确保即使任务被取消也能完成关闭 - # 通过 `cast` 明确告知类型检查器 `shield` 的返回类型,避免类型错误 - from typing import cast - await cast(asyncio.Future, asyncio.shield(self.session.close())) - logger.debug("连接已关闭") - except asyncio.CancelledError: - # 这是一个预期的行为,例如在流式聊天中断时 - logger.debug("关闭连接时任务被取消") - # 重新抛出异常以确保任务状态正确 - raise - except Exception as e: - logger.warning(f"关闭连接时出错: {e}") - - -class ConnectionPoolManager: - """透明的连接池管理器""" - - def __init__(self, max_pool_size: int = 10, max_lifetime: float = 300.0, max_idle: float = 60.0): - self.max_pool_size = max_pool_size - self.max_lifetime = max_lifetime - self.max_idle = max_idle - - # 连接池 - self._connections: set[ConnectionInfo] = set() - self._lock = asyncio.Lock() - - # 统计信息 - self._stats = { - "total_created": 0, - "total_reused": 0, - "total_expired": 0, - "active_connections": 0, - "pool_hits": 0, - "pool_misses": 0, - } - - # 后台清理任务 - self._cleanup_task: asyncio.Task | None = None - self._should_cleanup = False - - logger.info(f"连接池管理器初始化完成 (最大池大小: {max_pool_size})") - - async def start(self): - """启动连接池管理器""" - if self._cleanup_task is None: - self._should_cleanup = True - self._cleanup_task = asyncio.create_task(self._cleanup_loop()) - logger.info("连接池管理器已启动") - - async def stop(self): - """停止连接池管理器""" - self._should_cleanup = False - - if self._cleanup_task: - self._cleanup_task.cancel() - try: - await self._cleanup_task - except asyncio.CancelledError: - pass - self._cleanup_task = None - - # 关闭所有连接 - await self._close_all_connections() - logger.info("连接池管理器已停止") - - @asynccontextmanager - async def get_session(self, session_factory: async_sessionmaker[AsyncSession]): - """ - 获取数据库会话的透明包装器 - 如果有可用连接则复用,否则创建新连接 - """ - connection_info = None - - try: - # 尝试获取现有连接 - connection_info = await self._get_reusable_connection(session_factory) - - if connection_info: - # 复用现有连接 - connection_info.mark_used() - self._stats["total_reused"] += 1 - self._stats["pool_hits"] += 1 - logger.debug(f"复用现有连接 (活跃连接数: {len(self._connections)})") - else: - # 创建新连接 - session = session_factory() - connection_info = ConnectionInfo(session, time.time()) - - async with self._lock: - self._connections.add(connection_info) - - connection_info.mark_used() - self._stats["total_created"] += 1 - self._stats["pool_misses"] += 1 - logger.debug(f"创建新连接 (活跃连接数: {len(self._connections)})") - - yield connection_info.session - - except Exception: - # 发生错误时回滚连接 - if connection_info and connection_info.session: - try: - await connection_info.session.rollback() - except Exception as rollback_error: - logger.warning(f"回滚连接时出错: {rollback_error}") - raise - finally: - # 释放连接回池中 - if connection_info: - connection_info.mark_released() - - async def _get_reusable_connection( - self, session_factory: async_sessionmaker[AsyncSession] - ) -> ConnectionInfo | None: - """获取可复用的连接""" - # 导入方言适配器获取 ping 查询 - from src.common.database.core.dialect_adapter import DialectAdapter - - ping_query = DialectAdapter.get_ping_query() - - async with self._lock: - # 清理过期连接 - await self._cleanup_expired_connections_locked() - - # 查找可复用的连接 - for connection_info in list(self._connections): - if not connection_info.in_use and not connection_info.is_expired(self.max_lifetime, self.max_idle): - # 验证连接是否仍然有效 - try: - # 执行 ping 查询来验证连接 - await connection_info.session.execute(text(ping_query)) - return connection_info - except Exception as e: - logger.debug(f"连接验证失败,将移除: {e}") - await connection_info.close() - self._connections.remove(connection_info) - self._stats["total_expired"] += 1 - - # 检查是否可以创建新连接 - if len(self._connections) >= self.max_pool_size: - logger.warning(f"连接池已满 ({len(self._connections)}/{self.max_pool_size}),等待复用") - return None - - return None - - async def _cleanup_expired_connections_locked(self): - """清理过期连接(需要在锁内调用)""" - time.time() - expired_connections = [ - connection_info for connection_info in list(self._connections) - if connection_info.is_expired(self.max_lifetime, self.max_idle) and not connection_info.in_use - ] - - for connection_info in expired_connections: - await connection_info.close() - self._connections.remove(connection_info) - self._stats["total_expired"] += 1 - - if expired_connections: - logger.debug(f"清理了 {len(expired_connections)} 个过期连接") - - async def _cleanup_loop(self): - """后台清理循环""" - while self._should_cleanup: - try: - await asyncio.sleep(30.0) # 每30秒清理一次 - - async with self._lock: - await self._cleanup_expired_connections_locked() - - # 更新统计信息 - self._stats["active_connections"] = len(self._connections) - - except asyncio.CancelledError: - break - except Exception as e: - logger.error(f"连接池清理循环出错: {e}") - await asyncio.sleep(10.0) - - async def _close_all_connections(self): - """关闭所有连接""" - async with self._lock: - for connection_info in list(self._connections): - await connection_info.close() - - self._connections.clear() - logger.info("所有连接已关闭") - - def get_stats(self) -> dict[str, Any]: - """获取连接池统计信息""" - return { - **self._stats, - "active_connections": len(self._connections), - "max_pool_size": self.max_pool_size, - "pool_efficiency": ( - self._stats["pool_hits"] / max(1, self._stats["pool_hits"] + self._stats["pool_misses"]) - ) - * 100, - } - - -# 全局连接池管理器实例 -_connection_pool_manager: ConnectionPoolManager | None = None - - -def get_connection_pool_manager() -> ConnectionPoolManager: - """获取全局连接池管理器实例""" - global _connection_pool_manager - if _connection_pool_manager is None: - _connection_pool_manager = ConnectionPoolManager() - return _connection_pool_manager - - -async def start_connection_pool(): - """启动连接池""" - manager = get_connection_pool_manager() - await manager.start() - - -async def stop_connection_pool(): - """停止连接池""" - global _connection_pool_manager - if _connection_pool_manager: - await _connection_pool_manager.stop() - _connection_pool_manager = None diff --git a/src/common/database/core/session.py b/src/common/database/core/session.py index e3df1a03e..274472b60 100644 --- a/src/common/database/core/session.py +++ b/src/common/database/core/session.py @@ -87,7 +87,7 @@ async def _apply_session_settings(session: AsyncSession, db_type: str) -> None: async def get_db_session() -> AsyncGenerator[AsyncSession, None]: """获取数据库会话上下文管理器 - 这是数据库操作的主要入口点,通过连接池管理器提供透明的连接复用。 + 这是数据库操作的主要入口点,直接从会话工厂获取独立会话。 支持的数据库: - SQLite: 自动设置 busy_timeout 和外键约束 @@ -101,20 +101,7 @@ async def get_db_session() -> AsyncGenerator[AsyncSession, None]: Yields: AsyncSession: SQLAlchemy异步会话对象 """ - # 延迟导入避免循环依赖 - from ..optimization.connection_pool import get_connection_pool_manager - - session_factory = await get_session_factory() - pool_manager = get_connection_pool_manager() - - # 使用连接池管理器(透明复用连接) - async with pool_manager.get_session(session_factory) as session: - # 获取数据库类型并应用特定设置 - from src.config.config import global_config - - assert global_config is not None - await _apply_session_settings(session, global_config.database.database_type) - + async with get_db_session_direct() as session: yield session diff --git a/src/common/database/optimization/__init__.py b/src/common/database/optimization/__init__.py index 7cd0c99df..429f9cb78 100644 --- a/src/common/database/optimization/__init__.py +++ b/src/common/database/optimization/__init__.py @@ -1,7 +1,6 @@ """数据库优化层 职责: -- 连接池管理 - 批量调度 - 多级缓存 - 数据预加载 @@ -23,12 +22,6 @@ from .cache_manager import ( close_cache, get_cache, ) -from .connection_pool import ( - ConnectionPoolManager, - get_connection_pool_manager, - start_connection_pool, - stop_connection_pool, -) from .preloader import ( AccessPattern, CommonDataPreloader, @@ -46,8 +39,6 @@ __all__ = [ "CacheEntry", "CacheStats", "CommonDataPreloader", - # Connection Pool - "ConnectionPoolManager", # Preloader "DataPreloader", "LRUCache", @@ -59,8 +50,5 @@ __all__ = [ "close_preloader", "get_batch_scheduler", "get_cache", - "get_connection_pool_manager", "get_preloader", - "start_connection_pool", - "stop_connection_pool", ] diff --git a/src/common/database/optimization/cache_manager.py b/src/common/database/optimization/cache_manager.py index eed9783a5..30ad34bda 100644 --- a/src/common/database/optimization/cache_manager.py +++ b/src/common/database/optimization/cache_manager.py @@ -304,13 +304,11 @@ class MultiLevelCache: # 1. 尝试从L1获取 value = await self.l1_cache.get(key) if value is not None: - logger.debug(f"L1缓存命中: {key}") return value # 2. 尝试从L2获取 value = await self.l2_cache.get(key) if value is not None: - logger.debug(f"L2缓存命中: {key}") # 提升到L1 await self.l1_cache.set(key, value) return value diff --git a/src/common/database/optimization/connection_pool.py b/src/common/database/optimization/connection_pool.py deleted file mode 100644 index a00335af3..000000000 --- a/src/common/database/optimization/connection_pool.py +++ /dev/null @@ -1,299 +0,0 @@ -""" -透明连接复用管理器 - -在不改变原有API的情况下,实现数据库连接的智能复用 -""" - -import asyncio -import time -from contextlib import asynccontextmanager -from typing import Any - -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker - -from src.common.logger import get_logger - -logger = get_logger("database.connection_pool") - - -class ConnectionInfo: - """连接信息包装器""" - - def __init__(self, session: AsyncSession, created_at: float): - self.session = session - self.created_at = created_at - self.last_used = created_at - self.in_use = False - self.ref_count = 0 - - def mark_used(self): - """标记连接被使用""" - self.last_used = time.time() - self.in_use = True - self.ref_count += 1 - - def mark_released(self): - """标记连接被释放""" - self.in_use = False - self.ref_count = max(0, self.ref_count - 1) - - def is_expired(self, max_lifetime: float = 300.0, max_idle: float = 60.0) -> bool: - """检查连接是否过期""" - current_time = time.time() - - # 检查总生命周期 - if current_time - self.created_at > max_lifetime: - return True - - # 检查空闲时间 - if not self.in_use and current_time - self.last_used > max_idle: - return True - - return False - - async def close(self): - """关闭连接""" - try: - # 使用 shield 保护 close 操作,确保即使任务被取消也能完成关闭 - from typing import cast - await cast(asyncio.Future, asyncio.shield(self.session.close())) - logger.debug("连接已关闭") - except asyncio.CancelledError: - # 这是一个预期的行为,例如在流式聊天中断时 - logger.debug("关闭连接时任务被取消") - raise - except Exception as e: - logger.warning(f"关闭连接时出错: {e}") - - -class ConnectionPoolManager: - """透明的连接池管理器""" - - def __init__(self, max_pool_size: int = 10, max_lifetime: float = 300.0, max_idle: float = 60.0): - self.max_pool_size = max_pool_size - self.max_lifetime = max_lifetime - self.max_idle = max_idle - - # 连接池 - self._connections: set[ConnectionInfo] = set() - self._lock = asyncio.Lock() - - # 统计信息 - self._stats = { - "total_created": 0, - "total_reused": 0, - "total_expired": 0, - "active_connections": 0, - "pool_hits": 0, - "pool_misses": 0, - } - - # 后台清理任务 - self._cleanup_task: asyncio.Task | None = None - self._should_cleanup = False - - logger.info(f"连接池管理器初始化完成 (最大池大小: {max_pool_size})") - - async def start(self): - """启动连接池管理器""" - if self._cleanup_task is None: - self._should_cleanup = True - self._cleanup_task = asyncio.create_task(self._cleanup_loop()) - logger.info("✅ 连接池管理器已启动") - - async def stop(self): - """停止连接池管理器""" - self._should_cleanup = False - - if self._cleanup_task: - self._cleanup_task.cancel() - try: - await self._cleanup_task - except asyncio.CancelledError: - pass - self._cleanup_task = None - - # 关闭所有连接 - await self._close_all_connections() - logger.info("✅ 连接池管理器已停止") - - @asynccontextmanager - async def get_session(self, session_factory: async_sessionmaker[AsyncSession]): - """ - 获取数据库会话的透明包装器 - 如果有可用连接则复用,否则创建新连接 - - 事务管理说明: - - 正常退出时自动提交事务 - - 发生异常时自动回滚事务 - - 如果用户代码已手动调用 commit/rollback,再次调用是安全的(空操作) - - 支持所有数据库类型:SQLite、PostgreSQL - """ - connection_info = None - - try: - # 尝试获取现有连接 - connection_info = await self._get_reusable_connection(session_factory) - - if connection_info: - # 复用现有连接 - connection_info.mark_used() - self._stats["total_reused"] += 1 - self._stats["pool_hits"] += 1 - logger.debug(f"♻️ 复用连接 (池大小: {len(self._connections)})") - else: - # 创建新连接 - session = session_factory() - connection_info = ConnectionInfo(session, time.time()) - - async with self._lock: - self._connections.add(connection_info) - - connection_info.mark_used() - self._stats["total_created"] += 1 - self._stats["pool_misses"] += 1 - logger.debug(f"🆕 创建连接 (池大小: {len(self._connections)})") - - yield connection_info.session - - # 🔧 正常退出时提交事务 - # 这对所有数据库(SQLite、PostgreSQL)都很重要 - # 因为 SQLAlchemy 默认使用事务模式,不会自动提交 - # 注意:如果用户代码已调用 commit(),这里的 commit() 是安全的空操作 - if connection_info and connection_info.session: - try: - # 检查事务是否处于活动状态,避免在已回滚的事务上提交 - if connection_info.session.is_active: - await connection_info.session.commit() - except Exception as commit_error: - logger.warning(f"提交事务时出错: {commit_error}") - try: - await connection_info.session.rollback() - except Exception: - pass # 忽略回滚错误,因为事务可能已经结束 - raise - - except Exception: - # 发生错误时回滚连接 - if connection_info and connection_info.session: - try: - # 检查是否需要回滚(事务是否活动) - if connection_info.session.is_active: - await connection_info.session.rollback() - except Exception as rollback_error: - logger.warning(f"回滚连接时出错: {rollback_error}") - raise - finally: - # 释放连接回池中 - if connection_info: - connection_info.mark_released() - - async def _get_reusable_connection( - self, session_factory: async_sessionmaker[AsyncSession] - ) -> ConnectionInfo | None: - """获取可复用的连接""" - async with self._lock: - # 清理过期连接 - await self._cleanup_expired_connections_locked() - - # 查找可复用的连接 - for connection_info in list(self._connections): - if not connection_info.in_use and not connection_info.is_expired(self.max_lifetime, self.max_idle): - # 验证连接是否仍然有效 - try: - # 执行一个简单的查询来验证连接 - await connection_info.session.execute(text("SELECT 1")) - return connection_info - except Exception as e: - logger.debug(f"连接验证失败,将移除: {e}") - await connection_info.close() - self._connections.remove(connection_info) - self._stats["total_expired"] += 1 - - # 检查是否可以创建新连接 - if len(self._connections) >= self.max_pool_size: - logger.warning(f"⚠️ 连接池已满 ({len(self._connections)}/{self.max_pool_size})") - return None - - return None - - async def _cleanup_expired_connections_locked(self): - """清理过期连接(需要在锁内调用)""" - expired_connections = [ - connection_info for connection_info in list(self._connections) - if connection_info.is_expired(self.max_lifetime, self.max_idle) and not connection_info.in_use - ] - - for connection_info in expired_connections: - await connection_info.close() - self._connections.remove(connection_info) - self._stats["total_expired"] += 1 - - if expired_connections: - logger.debug(f"🧹 清理了 {len(expired_connections)} 个过期连接") - - async def _cleanup_loop(self): - """后台清理循环""" - while self._should_cleanup: - try: - await asyncio.sleep(30.0) # 每30秒清理一次 - - async with self._lock: - await self._cleanup_expired_connections_locked() - - # 更新统计信息 - self._stats["active_connections"] = len(self._connections) - - except asyncio.CancelledError: - break - except Exception as e: - logger.error(f"连接池清理循环出错: {e}") - await asyncio.sleep(10.0) - - async def _close_all_connections(self): - """关闭所有连接""" - async with self._lock: - for connection_info in list(self._connections): - await connection_info.close() - - self._connections.clear() - logger.info("所有连接已关闭") - - def get_stats(self) -> dict[str, Any]: - """获取连接池统计信息""" - total_requests = self._stats["pool_hits"] + self._stats["pool_misses"] - pool_efficiency = (self._stats["pool_hits"] / max(1, total_requests)) * 100 if total_requests > 0 else 0 - - return { - **self._stats, - "active_connections": len(self._connections), - "max_pool_size": self.max_pool_size, - "pool_efficiency": f"{pool_efficiency:.2f}%", - } - - -# 全局连接池管理器实例 -_connection_pool_manager: ConnectionPoolManager | None = None - - -def get_connection_pool_manager() -> ConnectionPoolManager: - """获取全局连接池管理器实例""" - global _connection_pool_manager - if _connection_pool_manager is None: - _connection_pool_manager = ConnectionPoolManager() - return _connection_pool_manager - - -async def start_connection_pool(): - """启动连接池""" - manager = get_connection_pool_manager() - await manager.start() - - -async def stop_connection_pool(): - """停止连接池""" - global _connection_pool_manager - if _connection_pool_manager: - await _connection_pool_manager.stop() - _connection_pool_manager = None