支持上下文+装饰器的计时器
This commit is contained in:
@@ -1,56 +1,81 @@
|
||||
from time import perf_counter
|
||||
from typing import Dict, Optional
|
||||
|
||||
"""
|
||||
计时器:用于性能计时
|
||||
|
||||
感谢D指导
|
||||
"""
|
||||
from functools import wraps
|
||||
from typing import Optional, Dict, Callable
|
||||
import asyncio
|
||||
|
||||
|
||||
class TimerTypeError(TypeError):
|
||||
"""自定义类型错误异常"""
|
||||
"""自定义类型错误"""
|
||||
|
||||
def __init__(self, param_name, expected_type, actual_type):
|
||||
super().__init__(f"Invalid type for '{param_name}'. Expected {expected_type}, got {actual_type.__name__}")
|
||||
def __init__(self, param, expected_type, actual_type):
|
||||
super().__init__(f"参数 '{param}' 类型错误,期望 {expected_type},实际得到 {actual_type.__name__}")
|
||||
|
||||
|
||||
class Timer:
|
||||
def __init__(self, name: Optional[str] = None, storage: Optional[Dict[str, float]] = None):
|
||||
"""支持多种模式的计时器(类型安全+人类可读)"""
|
||||
|
||||
def __init__(self, name: Optional[str] = None, storage: Optional[Dict[str, float]] = None, auto_unit: bool = True):
|
||||
self._validate_types(name, storage)
|
||||
|
||||
self.name = name # 计时器名称
|
||||
self.storage = storage # 计时结果存储
|
||||
self.elapsed = None # 计时结果
|
||||
self.name = name
|
||||
self.storage = storage
|
||||
self.elapsed = None
|
||||
self.auto_unit = auto_unit
|
||||
self._is_context = False
|
||||
|
||||
def _validate_types(self, name, storage):
|
||||
"""类型验证核心方法"""
|
||||
# 验证 name 类型
|
||||
"""类型检查"""
|
||||
if name is not None and not isinstance(name, str):
|
||||
raise TimerTypeError(param_name="name", expected_type="Optional[str]", actual_type=type(name))
|
||||
raise TimerTypeError("name", "Optional[str]", type(name))
|
||||
|
||||
# 验证 storage 类型
|
||||
if storage is not None and not isinstance(storage, dict):
|
||||
raise TimerTypeError(
|
||||
param_name="storage", expected_type="Optional[Dict[str, float]]", actual_type=type(storage)
|
||||
)
|
||||
raise TimerTypeError("storage", "Optional[dict]", type(storage))
|
||||
|
||||
def __call__(self, func: Optional[Callable] = None) -> Callable:
|
||||
"""装饰器模式"""
|
||||
if func is None:
|
||||
return lambda f: Timer(name=self.name or f.__name__, storage=self.storage, auto_unit=self.auto_unit)(f)
|
||||
|
||||
@wraps(func)
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
with self:
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
@wraps(func)
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
with self:
|
||||
return func(*args, **kwargs)
|
||||
|
||||
wrapper = async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
|
||||
wrapper.__timer__ = self # 保留计时器引用
|
||||
return wrapper
|
||||
|
||||
def __enter__(self):
|
||||
"""上下文管理器入口"""
|
||||
self._is_context = True
|
||||
self.start = perf_counter()
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.end = perf_counter()
|
||||
self.elapsed = self.end - self.start
|
||||
if isinstance(self.storage, dict) and self.name:
|
||||
self.elapsed = perf_counter() - self.start
|
||||
self._record_time()
|
||||
self._is_context = False
|
||||
return False
|
||||
|
||||
def _record_time(self):
|
||||
"""记录时间"""
|
||||
if self.storage is not None and self.name:
|
||||
self.storage[self.name] = self.elapsed
|
||||
|
||||
def get_result(self) -> float:
|
||||
"""安全获取计时结果"""
|
||||
return self.elapsed or 0.0
|
||||
|
||||
@property
|
||||
def human_readable(self) -> str:
|
||||
"""返回人类可读时间格式"""
|
||||
if self.elapsed >= 1:
|
||||
return f"{self.elapsed:.2f}秒"
|
||||
return f"{self.elapsed * 1000:.2f}毫秒"
|
||||
"""人类可读时间格式"""
|
||||
if self.elapsed is None:
|
||||
return "未计时"
|
||||
|
||||
if self.auto_unit:
|
||||
return f"{self.elapsed * 1000:.2f}毫秒" if self.elapsed < 1 else f"{self.elapsed:.2f}秒"
|
||||
return f"{self.elapsed:.4f}秒"
|
||||
|
||||
def __str__(self):
|
||||
return f"<Timer {self.name or '匿名'} [{self.human_readable}]>"
|
||||
|
||||
Reference in New Issue
Block a user