This commit is contained in:
明天好像没什么
2025-11-07 21:01:45 +08:00
committed by Windpicker-owo
parent 5702dd8a9f
commit 26d22e5dd8
45 changed files with 675 additions and 681 deletions

View File

@@ -368,13 +368,13 @@ class CacheManager:
if expired_keys:
logger.info(f"清理了 {len(expired_keys)} 个过期的L1缓存条目")
def get_health_stats(self) -> dict[str, Any]:
"""获取缓存健康统计信息"""
# 简化的健康统计,不包含内存监控(因为相关属性未定义)
return {
"l1_count": len(self.l1_kv_cache),
"l1_vector_count": self.l1_vector_index.ntotal if hasattr(self.l1_vector_index, 'ntotal') else 0,
"l1_vector_count": self.l1_vector_index.ntotal if hasattr(self.l1_vector_index, "ntotal") else 0,
"tool_stats": {
"total_tool_calls": self.tool_stats.get("total_tool_calls", 0),
"tracked_tools": len(self.tool_stats.get("most_used_tools", {})),
@@ -397,7 +397,7 @@ class CacheManager:
warnings.append(f"⚠️ L1缓存条目数较多: {l1_size}")
# 检查向量索引大小
vector_count = self.l1_vector_index.ntotal if hasattr(self.l1_vector_index, 'ntotal') else 0
vector_count = self.l1_vector_index.ntotal if hasattr(self.l1_vector_index, "ntotal") else 0
if isinstance(vector_count, int) and vector_count > 500:
warnings.append(f"⚠️ 向量索引条目数较多: {vector_count}")

View File

@@ -66,7 +66,7 @@ class BatchStats:
last_batch_duration: float = 0.0
last_batch_size: int = 0
congestion_score: float = 0.0 # 拥塞评分 (0-1)
# 🔧 新增:缓存统计
cache_size: int = 0 # 缓存条目数
cache_memory_mb: float = 0.0 # 缓存内存占用MB
@@ -539,8 +539,7 @@ class AdaptiveBatchScheduler:
def _set_cache(self, cache_key: str, result: Any) -> None:
"""设置缓存(改进版,带大小限制和内存统计)"""
import sys
# 🔧 检查缓存大小限制
if len(self._result_cache) >= self._cache_max_size:
# 首先清理过期条目
@@ -549,18 +548,18 @@ class AdaptiveBatchScheduler:
k for k, (_, ts) in self._result_cache.items()
if current_time - ts >= self.cache_ttl
]
for k in expired_keys:
# 更新内存统计
if k in self._cache_size_map:
self._cache_memory_estimate -= self._cache_size_map[k]
del self._cache_size_map[k]
del self._result_cache[k]
# 如果还是太大清理最老的条目LRU
if len(self._result_cache) >= self._cache_max_size:
oldest_key = min(
self._result_cache.keys(),
self._result_cache.keys(),
key=lambda k: self._result_cache[k][1]
)
# 更新内存统计
@@ -569,7 +568,7 @@ class AdaptiveBatchScheduler:
del self._cache_size_map[oldest_key]
del self._result_cache[oldest_key]
logger.debug(f"缓存已满,淘汰最老条目: {oldest_key}")
# 🔧 使用准确的内存估算方法
try:
total_size = estimate_size_smart(cache_key) + estimate_size_smart(result)
@@ -580,7 +579,7 @@ class AdaptiveBatchScheduler:
# 使用默认值
self._cache_size_map[cache_key] = 1024
self._cache_memory_estimate += 1024
self._result_cache[cache_key] = (result, time.time())
async def get_stats(self) -> BatchStats:

View File

@@ -171,7 +171,7 @@ class LRUCache(Generic[T]):
)
else:
adjusted_created_at = now
entry = CacheEntry(
value=value,
created_at=adjusted_created_at,
@@ -345,7 +345,7 @@ class MultiLevelCache:
# 估算数据大小(如果未提供)
if size is None:
size = estimate_size_smart(value)
# 检查单个条目大小是否超过限制
if size > self.max_item_size_bytes:
logger.warning(
@@ -354,7 +354,7 @@ class MultiLevelCache:
f"limit={self.max_item_size_bytes / (1024 * 1024):.2f}MB"
)
return
# 根据TTL决定写入哪个缓存层
if ttl is not None:
# 有自定义TTL根据TTL大小决定写入层级
@@ -394,37 +394,37 @@ class MultiLevelCache:
"""获取所有缓存层的统计信息(修正版,避免重复计数)"""
l1_stats = await self.l1_cache.get_stats()
l2_stats = await self.l2_cache.get_stats()
# 🔧 修复计算实际独占的内存避免L1和L2共享数据的重复计数
l1_keys = set(self.l1_cache._cache.keys())
l2_keys = set(self.l2_cache._cache.keys())
shared_keys = l1_keys & l2_keys
l1_only_keys = l1_keys - l2_keys
l2_only_keys = l2_keys - l1_keys
# 计算实际总内存(避免重复计数)
# L1独占内存
l1_only_size = sum(
self.l1_cache._cache[k].size
for k in l1_only_keys
self.l1_cache._cache[k].size
for k in l1_only_keys
if k in self.l1_cache._cache
)
# L2独占内存
l2_only_size = sum(
self.l2_cache._cache[k].size
for k in l2_only_keys
self.l2_cache._cache[k].size
for k in l2_only_keys
if k in self.l2_cache._cache
)
# 共享内存只计算一次使用L1的数据
shared_size = sum(
self.l1_cache._cache[k].size
for k in shared_keys
self.l1_cache._cache[k].size
for k in shared_keys
if k in self.l1_cache._cache
)
actual_total_size = l1_only_size + l2_only_size + shared_size
return {
"l1": l1_stats,
"l2": l2_stats,
@@ -442,7 +442,7 @@ class MultiLevelCache:
"""检查并强制清理超出内存限制的缓存"""
stats = await self.get_stats()
total_size = stats["l1"].total_size + stats["l2"].total_size
if total_size > self.max_memory_bytes:
memory_mb = total_size / (1024 * 1024)
max_mb = self.max_memory_bytes / (1024 * 1024)
@@ -452,14 +452,14 @@ class MultiLevelCache:
)
# 优先清理L2缓存温数据
await self.l2_cache.clear()
# 如果清理L2后仍超限清理L1
stats_after_l2 = await self.get_stats()
total_after_l2 = stats_after_l2["l1"].total_size + stats_after_l2["l2"].total_size
if total_after_l2 > self.max_memory_bytes:
logger.warning("清理L2后仍超限继续清理L1缓存")
await self.l1_cache.clear()
logger.info("缓存强制清理完成")
async def start_cleanup_task(self, interval: float = 60) -> None:
@@ -476,10 +476,10 @@ class MultiLevelCache:
while not self._is_closing:
try:
await asyncio.sleep(interval)
if self._is_closing:
break
stats = await self.get_stats()
l1_stats = stats["l1"]
l2_stats = stats["l2"]
@@ -493,13 +493,13 @@ class MultiLevelCache:
f"共享: {stats['shared_keys_count']}键/{stats['shared_mb']:.2f}MB "
f"(去重节省{stats['dedup_savings_mb']:.2f}MB)"
)
# 🔧 清理过期条目
await self._clean_expired_entries()
# 检查内存限制
await self.check_memory_limit()
except asyncio.CancelledError:
break
except Exception as e:
@@ -511,7 +511,7 @@ class MultiLevelCache:
async def stop_cleanup_task(self) -> None:
"""停止清理任务"""
self._is_closing = True
if self._cleanup_task is not None:
self._cleanup_task.cancel()
try:
@@ -520,43 +520,43 @@ class MultiLevelCache:
pass
self._cleanup_task = None
logger.info("缓存清理任务已停止")
async def _clean_expired_entries(self) -> None:
"""清理过期的缓存条目"""
try:
current_time = time.time()
# 清理 L1 过期条目
async with self.l1_cache._lock:
expired_keys = [
key for key, entry in self.l1_cache._cache.items()
if current_time - entry.created_at > self.l1_cache.ttl
]
for key in expired_keys:
entry = self.l1_cache._cache.pop(key, None)
if entry:
self.l1_cache._stats.evictions += 1
self.l1_cache._stats.item_count -= 1
self.l1_cache._stats.total_size -= entry.size
# 清理 L2 过期条目
async with self.l2_cache._lock:
expired_keys = [
key for key, entry in self.l2_cache._cache.items()
if current_time - entry.created_at > self.l2_cache.ttl
]
for key in expired_keys:
entry = self.l2_cache._cache.pop(key, None)
if entry:
self.l2_cache._stats.evictions += 1
self.l2_cache._stats.item_count -= 1
self.l2_cache._stats.total_size -= entry.size
if expired_keys:
logger.debug(f"清理了 {len(expired_keys)} 个过期缓存条目")
except Exception as e:
logger.error(f"清理过期条目失败: {e}", exc_info=True)
@@ -568,7 +568,7 @@ _cache_lock = asyncio.Lock()
async def get_cache() -> MultiLevelCache:
"""获取全局缓存实例(单例)
从配置文件读取缓存参数,如果配置未加载则使用默认值
如果配置中禁用了缓存返回一个最小化的缓存实例容量为1
"""
@@ -580,9 +580,9 @@ async def get_cache() -> MultiLevelCache:
# 尝试从配置读取参数
try:
from src.config.config import global_config
db_config = global_config.database
# 检查是否启用缓存
if not db_config.enable_database_cache:
logger.info("数据库缓存已禁用,使用最小化缓存实例")
@@ -594,7 +594,7 @@ async def get_cache() -> MultiLevelCache:
max_memory_mb=1,
)
return _global_cache
l1_max_size = db_config.cache_l1_max_size
l1_ttl = db_config.cache_l1_ttl
l2_max_size = db_config.cache_l2_max_size
@@ -602,7 +602,7 @@ async def get_cache() -> MultiLevelCache:
max_memory_mb = db_config.cache_max_memory_mb
max_item_size_mb = db_config.cache_max_item_size_mb
cleanup_interval = db_config.cache_cleanup_interval
logger.info(
f"从配置加载缓存参数: L1({l1_max_size}/{l1_ttl}s), "
f"L2({l2_max_size}/{l2_ttl}s), 内存限制({max_memory_mb}MB), "
@@ -618,7 +618,7 @@ async def get_cache() -> MultiLevelCache:
max_memory_mb = 100
max_item_size_mb = 1
cleanup_interval = 60
_global_cache = MultiLevelCache(
l1_max_size=l1_max_size,
l1_ttl=l1_ttl,

View File

@@ -4,73 +4,74 @@
提供比 sys.getsizeof() 更准确的内存占用估算方法
"""
import sys
import pickle
import sys
from typing import Any
import numpy as np
def get_accurate_size(obj: Any, seen: set | None = None) -> int:
"""
准确估算对象的内存大小(递归计算所有引用对象)
比 sys.getsizeof() 准确得多,特别是对于复杂嵌套对象。
Args:
obj: 要估算大小的对象
seen: 已访问对象的集合(用于避免循环引用)
Returns:
估算的字节数
"""
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
size = sys.getsizeof(obj)
# NumPy 数组特殊处理
if isinstance(obj, np.ndarray):
size += obj.nbytes
return size
# 字典:递归计算所有键值对
if isinstance(obj, dict):
size += sum(get_accurate_size(k, seen) + get_accurate_size(v, seen)
size += sum(get_accurate_size(k, seen) + get_accurate_size(v, seen)
for k, v in obj.items())
# 列表、元组、集合:递归计算所有元素
elif isinstance(obj, (list, tuple, set, frozenset)):
elif isinstance(obj, list | tuple | set | frozenset):
size += sum(get_accurate_size(item, seen) for item in obj)
# 有 __dict__ 的对象:递归计算属性
elif hasattr(obj, '__dict__'):
elif hasattr(obj, "__dict__"):
size += get_accurate_size(obj.__dict__, seen)
# 其他可迭代对象
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)):
elif hasattr(obj, "__iter__") and not isinstance(obj, str | bytes | bytearray):
try:
size += sum(get_accurate_size(item, seen) for item in obj)
except:
pass
return size
def get_pickle_size(obj: Any) -> int:
"""
使用 pickle 序列化大小作为参考
通常比 sys.getsizeof() 更接近实际内存占用,
但可能略小于真实内存占用(不包括 Python 对象开销)
Args:
obj: 要估算大小的对象
Returns:
pickle 序列化后的字节数,失败返回 0
"""
@@ -83,17 +84,17 @@ def get_pickle_size(obj: Any) -> int:
def estimate_size_smart(obj: Any, max_depth: int = 5, sample_large: bool = True) -> int:
"""
智能估算对象大小(平衡准确性和性能)
使用深度受限的递归估算+采样策略,平衡准确性和性能:
- 深度5层足以覆盖99%的缓存数据结构
- 对大型容器(>100项进行采样估算
- 性能开销约60倍于sys.getsizeof但准确度提升1000+倍
Args:
obj: 要估算大小的对象
max_depth: 最大递归深度默认5层可覆盖大多数嵌套结构
sample_large: 对大型容器是否采样默认True提升性能
Returns:
估算的字节数
"""
@@ -105,24 +106,24 @@ def _estimate_recursive(obj: Any, depth: int, seen: set, sample_large: bool) ->
# 检查深度限制
if depth <= 0:
return sys.getsizeof(obj)
# 检查循环引用
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
# 基本大小
size = sys.getsizeof(obj)
# 简单类型直接返回
if isinstance(obj, (int, float, bool, type(None), str, bytes, bytearray)):
if isinstance(obj, int | float | bool | type(None) | str | bytes | bytearray):
return size
# NumPy 数组特殊处理
if isinstance(obj, np.ndarray):
return size + obj.nbytes
# 字典递归
if isinstance(obj, dict):
items = list(obj.items())
@@ -130,7 +131,7 @@ def _estimate_recursive(obj: Any, depth: int, seen: set, sample_large: bool) ->
# 大字典采样前50 + 中间50 + 最后50
sample_items = items[:50] + items[len(items)//2-25:len(items)//2+25] + items[-50:]
sampled_size = sum(
_estimate_recursive(k, depth - 1, seen, sample_large) +
_estimate_recursive(k, depth - 1, seen, sample_large) +
_estimate_recursive(v, depth - 1, seen, sample_large)
for k, v in sample_items
)
@@ -142,9 +143,9 @@ def _estimate_recursive(obj: Any, depth: int, seen: set, sample_large: bool) ->
size += _estimate_recursive(k, depth - 1, seen, sample_large)
size += _estimate_recursive(v, depth - 1, seen, sample_large)
return size
# 列表、元组、集合递归
if isinstance(obj, (list, tuple, set, frozenset)):
if isinstance(obj, list | tuple | set | frozenset):
items = list(obj)
if sample_large and len(items) > 100:
# 大容器采样前50 + 中间50 + 最后50
@@ -160,21 +161,21 @@ def _estimate_recursive(obj: Any, depth: int, seen: set, sample_large: bool) ->
for item in items:
size += _estimate_recursive(item, depth - 1, seen, sample_large)
return size
# 有 __dict__ 的对象
if hasattr(obj, '__dict__'):
if hasattr(obj, "__dict__"):
size += _estimate_recursive(obj.__dict__, depth - 1, seen, sample_large)
return size
def format_size(size_bytes: int) -> str:
"""
格式化字节数为人类可读的格式
Args:
size_bytes: 字节数
Returns:
格式化后的字符串,如 "1.23 MB"
"""