Files
Mofox-Core/src/common/memory_utils.py
LuiKlee 8f77465bc3 ruff
2025-12-13 16:39:25 +08:00

239 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
准确的内存大小估算工具
提供比 sys.getsizeof() 更准确的内存占用估算方法
"""
import pickle
import sys
from typing import Any
import numpy as np
def get_accurate_size(obj: Any, seen: set | None = None, max_depth: int = 3, _current_depth: int = 0) -> int:
"""
准确估算对象的内存大小(递归计算所有引用对象)
比 sys.getsizeof() 准确得多,特别是对于复杂嵌套对象。
警告:此函数可能在复杂对象上产生大量临时对象,建议优先使用 estimate_size_smart()
Args:
obj: 要估算大小的对象
seen: 已访问对象的集合(用于避免循环引用)
max_depth: 最大递归深度防止在复杂对象图上递归爆炸默认3层
_current_depth: 当前递归深度(内部参数)
Returns:
估算的字节数
"""
if seen is None:
seen = set()
# 深度限制:防止递归爆炸
if _current_depth >= max_depth:
return sys.getsizeof(obj)
# 对象数量限制:防止内存爆炸
if len(seen) > 10000:
return sys.getsizeof(obj)
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
size = sys.getsizeof(obj)
# NumPy 数组特殊处理
if isinstance(obj, np.ndarray):
size += obj.nbytes
return size
# 字典:递归计算所有键值对
if isinstance(obj, dict):
# 限制处理的键值对数量
items = list(obj.items())[:1000] # 最多处理1000个键值对
size += sum(get_accurate_size(k, seen, max_depth, _current_depth + 1) +
get_accurate_size(v, seen, max_depth, _current_depth + 1)
for k, v in items)
# 列表、元组、集合:递归计算所有元素
elif isinstance(obj, list | tuple | set | frozenset):
# 限制处理的元素数量
items = list(obj)[:1000] # 最多处理1000个元素
size += sum(get_accurate_size(item, seen, max_depth, _current_depth + 1) for item in items)
# 有 __dict__ 的对象:递归计算属性
elif hasattr(obj, "__dict__"):
size += get_accurate_size(obj.__dict__, seen, max_depth, _current_depth + 1)
# 其他可迭代对象
elif hasattr(obj, "__iter__") and not isinstance(obj, str | bytes | bytearray):
try:
# 限制处理的元素数量
items = list(obj)[:1000] # 最多处理1000个元素
size += sum(get_accurate_size(item, seen, max_depth, _current_depth + 1) for item in items)
except:
pass
return size
def get_pickle_size(obj: Any) -> int:
"""
使用 pickle 序列化大小作为参考
通常比 sys.getsizeof() 更接近实际内存占用,
但可能略小于真实内存占用(不包括 Python 对象开销)
Args:
obj: 要估算大小的对象
Returns:
pickle 序列化后的字节数,失败返回 0
"""
try:
return len(pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL))
except Exception:
return 0
def estimate_size_smart(obj: Any, max_depth: int = 5, sample_large: bool = True) -> int:
"""
智能估算对象大小(平衡准确性和性能)
使用深度受限的递归估算+采样策略,平衡准确性和性能:
- 深度5层足以覆盖99%的缓存数据结构
- 对大型容器(>100项进行采样估算
- 性能开销约60倍于sys.getsizeof但准确度提升1000+倍
Args:
obj: 要估算大小的对象
max_depth: 最大递归深度默认5层可覆盖大多数嵌套结构
sample_large: 对大型容器是否采样默认True提升性能
Returns:
估算的字节数
"""
return _estimate_recursive(obj, max_depth, set(), sample_large)
def _estimate_recursive(obj: Any, depth: int, seen: set, sample_large: bool) -> int:
"""递归估算,带深度限制和采样"""
# 检查深度限制
if depth <= 0:
return sys.getsizeof(obj)
# 检查循环引用
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
# 基本大小
size = sys.getsizeof(obj)
# 简单类型直接返回
if isinstance(obj, int | float | bool | type(None) | str | bytes | bytearray):
return size
# NumPy 数组特殊处理
if isinstance(obj, np.ndarray):
return size + obj.nbytes
# 字典递归
if isinstance(obj, dict):
items = list(obj.items())
if sample_large and len(items) > 100:
# 大字典采样前50 + 中间50 + 最后50
sample_items = items[:50] + items[len(items)//2-25:len(items)//2+25] + items[-50:]
sampled_size = sum(
_estimate_recursive(k, depth - 1, seen, sample_large) +
_estimate_recursive(v, depth - 1, seen, sample_large)
for k, v in sample_items
)
# 按比例推算总大小
size += int(sampled_size * len(items) / len(sample_items))
else:
# 小字典全部计算
for k, v in items:
size += _estimate_recursive(k, depth - 1, seen, sample_large)
size += _estimate_recursive(v, depth - 1, seen, sample_large)
return size
# 列表、元组、集合递归
if isinstance(obj, list | tuple | set | frozenset):
items = list(obj)
if sample_large and len(items) > 100:
# 大容器采样前50 + 中间50 + 最后50
sample_items = items[:50] + items[len(items)//2-25:len(items)//2+25] + items[-50:]
sampled_size = sum(
_estimate_recursive(item, depth - 1, seen, sample_large)
for item in sample_items
)
# 按比例推算总大小
size += int(sampled_size * len(items) / len(sample_items))
else:
# 小容器全部计算
for item in items:
size += _estimate_recursive(item, depth - 1, seen, sample_large)
return size
# 有 __dict__ 的对象
if hasattr(obj, "__dict__"):
size += _estimate_recursive(obj.__dict__, depth - 1, seen, sample_large)
return size
def estimate_cache_item_size(obj: Any) -> int:
"""
估算缓存条目的大小。
使用轻量级的方法快速估算大小,避免递归爆炸:
1. 优先使用 pickle 大小(快速且准确)
2. 对于无法 pickle 的对象,使用深度受限的智能估算
3. 最后兜底使用 sys.getsizeof
性能优化:避免调用 get_accurate_size(),该函数在复杂对象上会产生大量临时对象
"""
# 方法1: pickle 大小(最快最准确)
pickle_size = get_pickle_size(obj)
if pickle_size > 0:
# pickle 通常略小于实际内存乘以1.5作为安全系数
return int(pickle_size * 1.5)
# 方法2: 智能估算(深度受限,采样大容器)
try:
smart_size = estimate_size_smart(obj, max_depth=5, sample_large=True)
if smart_size > 0:
return smart_size
except Exception:
pass
def format_size(size_bytes: int) -> str:
"""
格式化字节数为人类可读的格式
Args:
size_bytes: 字节数
Returns:
格式化后的字符串,如 "1.23 MB"
"""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.2f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / 1024 / 1024:.2f} MB"
else:
return f"{size_bytes / 1024 / 1024 / 1024:.2f} GB"
# 向后兼容的别名
get_deep_size = get_accurate_size