实现慢查询监控系统

该功能默认关闭
This commit is contained in:
LuiKlee
2025-12-17 14:09:02 +08:00
parent 410614cf62
commit d6ba543b24
8 changed files with 1197 additions and 9 deletions

View File

@@ -33,6 +33,13 @@ from .monitoring import (
record_cache_miss,
record_operation,
reset_stats,
get_slow_queries,
get_slow_query_report,
record_slow_query,
set_slow_query_config,
enable_slow_query_monitoring,
disable_slow_query_monitoring,
is_slow_query_monitoring_enabled,
)
__all__ = [
@@ -57,6 +64,13 @@ __all__ = [
"record_cache_miss",
"record_operation",
"reset_stats",
"get_slow_queries",
"get_slow_query_report",
"record_slow_query",
"set_slow_query_config",
"enable_slow_query_monitoring",
"disable_slow_query_monitoring",
"is_slow_query_monitoring_enabled",
# 装饰器
"retry",
"timeout",

View File

@@ -213,37 +213,68 @@ def cached(
return decorator
def measure_time(log_slow: float | None = None):
def measure_time(log_slow: float | None = None, operation_name: str | None = None):
"""性能测量装饰器
测量函数执行时间,可选择性记录慢查询
测量函数执行时间,可选择性记录慢查询并集成到监控系统
Args:
log_slow: 慢查询阈值(秒),超过此时间会记录warning日志
log_slow: 慢查询阈值(秒),None 表示使用配置中的阈值0 表示禁用
operation_name: 操作名称用于监控统计None 表示使用函数名
Example:
@measure_time(log_slow=1.0)
async def complex_query():
return await session.execute(stmt)
@measure_time() # 使用配置的阈值
async def database_query():
return await session.execute(stmt)
"""
def decorator(func: Callable[P, Coroutine[Any, Any, R]]) -> Callable[P, Coroutine[Any, Any, R]]:
@functools.wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
from src.common.database.utils.monitoring import get_monitor
# 确定操作名称
op_name = operation_name or func.__name__
start_time = time.perf_counter()
success = False
try:
result = await func(*args, **kwargs)
success = True
return result
finally:
elapsed = time.perf_counter() - start_time
if log_slow and elapsed > log_slow:
logger.warning(
f"{func.__name__} 执行缓慢: {elapsed:.3f}s (阈值: {log_slow}s)"
)
# 获取监控器
monitor = get_monitor()
# 记录到监控系统
if success:
monitor.record_operation(op_name, elapsed, success=True)
# 只在监控启用时检查慢查询
if monitor.is_enabled():
# 判断是否为慢查询
threshold = log_slow
if threshold is None:
# 使用配置中的阈值
threshold = monitor.get_metrics().slow_query_threshold
if threshold > 0 and elapsed > threshold:
logger.warning(
f"🐢 {func.__name__} 执行缓慢: {elapsed:.3f}s (阈值: {threshold:.3f}s)"
)
else:
logger.debug(f"{func.__name__} 执行时间: {elapsed:.3f}s")
else:
logger.debug(f"{func.__name__} 执行时间: {elapsed:.3f}s")
else:
logger.debug(f"{func.__name__} 执行时间: {elapsed:.3f}s")
monitor.record_operation(op_name, elapsed, success=False)
return wrapper

View File

@@ -4,6 +4,7 @@
"""
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Any, Optional
@@ -12,6 +13,24 @@ from src.common.logger import get_logger
logger = get_logger("database.monitoring")
@dataclass
class SlowQueryRecord:
"""慢查询记录"""
operation_name: str
execution_time: float
timestamp: float
sql: str | None = None
args: tuple | None = None
stack_trace: str | None = None
def __str__(self) -> str:
return (
f"[{self.operation_name}] {self.execution_time:.3f}s "
f"@ {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.timestamp))}"
)
@dataclass
class OperationMetrics:
"""操作指标"""
@@ -22,6 +41,7 @@ class OperationMetrics:
max_time: float = 0.0
error_count: int = 0
last_execution_time: float | None = None
slow_query_count: int = 0 # 该操作的慢查询数
@property
def avg_time(self) -> float:
@@ -40,6 +60,10 @@ class OperationMetrics:
"""记录错误"""
self.error_count += 1
def record_slow_query(self):
"""记录慢查询"""
self.slow_query_count += 1
@dataclass
class DatabaseMetrics:
@@ -64,6 +88,10 @@ class DatabaseMetrics:
batch_items_total: int = 0
batch_avg_size: float = 0.0
# 慢查询统计
slow_query_count: int = 0
slow_query_threshold: float = 0.5 # 慢查询阈值
@property
def cache_hit_rate(self) -> float:
"""缓存命中率"""
@@ -92,26 +120,83 @@ class DatabaseMonitor:
_instance: Optional["DatabaseMonitor"] = None
_metrics: DatabaseMetrics
_slow_queries: deque # 最近的慢查询记录
_slow_query_buffer_size: int = 100
_enabled: bool = False # 慢查询监控是否启用
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._metrics = DatabaseMetrics()
cls._instance._slow_queries = deque(maxlen=cls._slow_query_buffer_size)
cls._instance._enabled = False
return cls._instance
def enable(self):
"""启用慢查询监控"""
self._enabled = True
logger.info("✅ 慢查询监控已启用")
def disable(self):
"""禁用慢查询监控"""
self._enabled = False
logger.info("❌ 慢查询监控已禁用")
def is_enabled(self) -> bool:
"""检查慢查询监控是否启用"""
return self._enabled
def set_slow_query_config(self, threshold: float, buffer_size: int):
"""设置慢查询配置"""
self._metrics.slow_query_threshold = threshold
self._slow_query_buffer_size = buffer_size
self._slow_queries = deque(maxlen=buffer_size)
# 设置配置时自动启用
self._enabled = True
def record_operation(
self,
operation_name: str,
execution_time: float,
success: bool = True,
sql: str | None = None,
):
"""记录操作"""
metrics = self._metrics.get_operation_metrics(operation_name)
if success:
metrics.record_success(execution_time)
# 只在启用时检查是否为慢查询
if self._enabled and execution_time > self._metrics.slow_query_threshold:
self.record_slow_query(operation_name, execution_time, sql)
else:
metrics.record_error()
def record_slow_query(
self,
operation_name: str,
execution_time: float,
sql: str | None = None,
args: tuple | None = None,
stack_trace: str | None = None,
):
"""记录慢查询"""
self._metrics.slow_query_count += 1
self._metrics.get_operation_metrics(operation_name).record_slow_query()
record = SlowQueryRecord(
operation_name=operation_name,
execution_time=execution_time,
timestamp=time.time(),
sql=sql,
args=args,
stack_trace=stack_trace,
)
self._slow_queries.append(record)
# 立即记录到日志(实时告警)
logger.warning(f"🐢 慢查询: {record}")
def record_connection_acquired(self):
"""记录连接获取"""
self._metrics.connection_acquired += 1
@@ -152,6 +237,81 @@ class DatabaseMonitor:
"""获取指标"""
return self._metrics
def get_slow_queries(self, limit: int = 0) -> list[SlowQueryRecord]:
"""获取慢查询记录
Args:
limit: 返回数量限制0 表示返回全部
Returns:
慢查询记录列表
"""
records = list(self._slow_queries)
if limit > 0:
records = records[-limit:]
return records
def get_slow_query_report(self) -> dict[str, Any]:
"""获取慢查询报告"""
slow_queries = list(self._slow_queries)
if not slow_queries:
return {
"total": 0,
"threshold": f"{self._metrics.slow_query_threshold:.3f}s",
"top_operations": [],
"recent_queries": [],
}
# 按操作分组统计
operation_stats = {}
for record in slow_queries:
if record.operation_name not in operation_stats:
operation_stats[record.operation_name] = {
"count": 0,
"total_time": 0.0,
"max_time": 0.0,
"min_time": float("inf"),
}
stats = operation_stats[record.operation_name]
stats["count"] += 1
stats["total_time"] += record.execution_time
stats["max_time"] = max(stats["max_time"], record.execution_time)
stats["min_time"] = min(stats["min_time"], record.execution_time)
# 按慢查询数排序
top_operations = sorted(
operation_stats.items(),
key=lambda x: x[1]["count"],
reverse=True,
)[:10]
return {
"total": len(slow_queries),
"threshold": f"{self._metrics.slow_query_threshold:.3f}s",
"top_operations": [
{
"operation": op_name,
"count": stats["count"],
"avg_time": f"{stats['total_time'] / stats['count']:.3f}s",
"max_time": f"{stats['max_time']:.3f}s",
"min_time": f"{stats['min_time']:.3f}s",
}
for op_name, stats in top_operations
],
"recent_queries": [
{
"operation": record.operation_name,
"time": f"{record.execution_time:.3f}s",
"timestamp": time.strftime(
"%Y-%m-%d %H:%M:%S",
time.localtime(record.timestamp),
),
}
for record in slow_queries[-20:]
],
}
def get_summary(self) -> dict[str, Any]:
"""获取统计摘要"""
metrics = self._metrics
@@ -164,6 +324,7 @@ class DatabaseMonitor:
"min_time": f"{op_metrics.min_time:.3f}s",
"max_time": f"{op_metrics.max_time:.3f}s",
"error_count": op_metrics.error_count,
"slow_query_count": op_metrics.slow_query_count,
}
return {
@@ -188,6 +349,8 @@ class DatabaseMonitor:
},
"overall": {
"error_rate": f"{metrics.error_rate:.2%}",
"slow_query_count": metrics.slow_query_count,
"slow_query_threshold": f"{metrics.slow_query_threshold:.3f}s",
},
}
@@ -209,7 +372,8 @@ class DatabaseMonitor:
f"平均={stats['avg_time']}, "
f"最小={stats['min_time']}, "
f"最大={stats['max_time']}, "
f"错误={stats['error_count']}"
f"错误={stats['error_count']}, "
f"慢查询={stats['slow_query_count']}"
)
# 连接池统计
@@ -246,6 +410,24 @@ class DatabaseMonitor:
logger.info("\n整体:")
overall = summary["overall"]
logger.info(f" 错误率={overall['error_rate']}")
logger.info(f" 慢查询总数={overall['slow_query_count']}")
logger.info(f" 慢查询阈值={overall['slow_query_threshold']}")
# 慢查询报告
if overall["slow_query_count"] > 0:
logger.info("\n🐢 慢查询报告:")
slow_report = self.get_slow_query_report()
if slow_report["top_operations"]:
logger.info(" 按操作排名Top 10:")
for idx, op in enumerate(slow_report["top_operations"], 1):
logger.info(
f" {idx}. {op['operation']}: "
f"次数={op['count']}, "
f"平均={op['avg_time']}, "
f"最大={op['max_time']}"
)
logger.info("=" * 60)
@@ -273,6 +455,46 @@ def record_operation(operation_name: str, execution_time: float, success: bool =
get_monitor().record_operation(operation_name, execution_time, success)
def record_slow_query(
operation_name: str,
execution_time: float,
sql: str | None = None,
args: tuple | None = None,
):
"""记录慢查询"""
get_monitor().record_slow_query(operation_name, execution_time, sql, args)
def get_slow_queries(limit: int = 0) -> list[SlowQueryRecord]:
"""获取慢查询记录"""
return get_monitor().get_slow_queries(limit)
def get_slow_query_report() -> dict[str, Any]:
"""获取慢查询报告"""
return get_monitor().get_slow_query_report()
def set_slow_query_config(threshold: float, buffer_size: int):
"""设置慢查询配置"""
get_monitor().set_slow_query_config(threshold, buffer_size)
def enable_slow_query_monitoring():
"""启用慢查询监控"""
get_monitor().enable()
def disable_slow_query_monitoring():
"""禁用慢查询监控"""
get_monitor().disable()
def is_slow_query_monitoring_enabled() -> bool:
"""检查慢查询监控是否启用"""
return get_monitor().is_enabled()
def record_cache_hit():
"""记录缓存命中"""
get_monitor().record_cache_hit()

View File

@@ -0,0 +1,437 @@
"""慢查询分析工具
提供慢查询的详细分析和报告生成功能
"""
import time
from collections import defaultdict
from datetime import datetime
from typing import Any
from src.common.database.utils.monitoring import get_monitor
from src.common.logger import get_logger
logger = get_logger("database.slow_query_analyzer")
class SlowQueryAnalyzer:
"""慢查询分析器"""
@staticmethod
def generate_html_report(output_file: str | None = None) -> str:
"""生成HTML格式的慢查询报告
Args:
output_file: 输出文件路径None 表示只返回HTML字符串
Returns:
HTML字符串
"""
monitor = get_monitor()
report = monitor.get_slow_query_report()
metrics = monitor.get_metrics()
html = f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>数据库慢查询报告</title>
<style>
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', 'Oxygen', 'Ubuntu', 'Cantarell', sans-serif;
background: #f5f5f5;
padding: 20px;
}}
.container {{
max-width: 1200px;
margin: 0 auto;
background: white;
border-radius: 8px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
overflow: hidden;
}}
header {{
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 30px;
text-align: center;
}}
header h1 {{
font-size: 28px;
margin-bottom: 10px;
}}
header p {{
font-size: 14px;
opacity: 0.9;
}}
.stats {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 20px;
padding: 30px;
background: #f9f9f9;
border-bottom: 1px solid #eee;
}}
.stat-card {{
text-align: center;
padding: 20px;
background: white;
border-radius: 6px;
border-left: 4px solid #667eea;
}}
.stat-card .value {{
font-size: 28px;
font-weight: bold;
color: #333;
margin: 10px 0;
}}
.stat-card .label {{
font-size: 12px;
color: #999;
text-transform: uppercase;
}}
.section {{
padding: 30px;
border-bottom: 1px solid #eee;
}}
.section:last-child {{
border-bottom: none;
}}
.section h2 {{
font-size: 20px;
margin-bottom: 20px;
color: #333;
border-bottom: 2px solid #667eea;
padding-bottom: 10px;
}}
table {{
width: 100%;
border-collapse: collapse;
font-size: 14px;
}}
table thead {{
background: #f9f9f9;
}}
table th {{
padding: 12px;
text-align: left;
font-weight: 600;
color: #666;
border-bottom: 2px solid #ddd;
}}
table td {{
padding: 12px;
border-bottom: 1px solid #eee;
}}
table tbody tr:hover {{
background: #f9f9f9;
}}
.badge {{
display: inline-block;
padding: 4px 8px;
border-radius: 3px;
font-size: 12px;
font-weight: 600;
}}
.badge-warning {{
background: #fff3cd;
color: #856404;
}}
.badge-danger {{
background: #f8d7da;
color: #721c24;
}}
.badge-success {{
background: #d4edda;
color: #155724;
}}
.progress-bar {{
height: 4px;
background: #eee;
border-radius: 2px;
overflow: hidden;
margin-top: 4px;
}}
.progress-bar-fill {{
height: 100%;
background: linear-gradient(90deg, #667eea, #764ba2);
}}
.empty-state {{
text-align: center;
padding: 40px;
color: #999;
}}
.empty-state-icon {{
font-size: 48px;
margin-bottom: 16px;
}}
</style>
</head>
<body>
<div class="container">
<header>
<h1>🐢 数据库慢查询报告</h1>
<p>生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
</header>
<div class="stats">
<div class="stat-card">
<div class="label">总慢查询数</div>
<div class="value">{report['total']}</div>
</div>
<div class="stat-card">
<div class="label">慢查询阈值</div>
<div class="value">{report['threshold']}</div>
</div>
<div class="stat-card">
<div class="label">总操作数</div>
<div class="value">{sum(m.count for m in metrics.operations.values())}</div>
</div>
<div class="stat-card">
<div class="label">慢查询比例</div>
<div class="value">
{f"{(report['total'] / sum(m.count for m in metrics.operations.values()) * 100):.1f}%" if sum(m.count for m in metrics.operations.values()) > 0 else "0%"}
</div>
</div>
</div>
<div class="section">
<h2>📊 按操作排名 (Top 10)</h2>
{_render_operations_table(report) if report['top_operations'] else '<div class="empty-state"><div class="empty-state-icon">📭</div><p>暂无数据</p></div>'}
</div>
<div class="section">
<h2>⏱️ 最近的慢查询 (Top 20)</h2>
{_render_recent_queries_table(report) if report['recent_queries'] else '<div class="empty-state"><div class="empty-state-icon">📭</div><p>暂无数据</p></div>'}
</div>
<div class="section">
<h2>💡 优化建议</h2>
{_render_suggestions(report, metrics)}
</div>
</div>
</body>
</html>
"""
if output_file:
with open(output_file, "w", encoding="utf-8") as f:
f.write(html)
logger.info(f"慢查询报告已生成: {output_file}")
return html
@staticmethod
def generate_text_report() -> str:
"""生成文本格式的慢查询报告
Returns:
文本字符串
"""
monitor = get_monitor()
report = monitor.get_slow_query_report()
metrics = monitor.get_metrics()
lines = []
lines.append("=" * 80)
lines.append("🐢 数据库慢查询报告".center(80))
lines.append("=" * 80)
lines.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
lines.append("")
# 总体统计
total_ops = sum(m.count for m in metrics.operations.values())
lines.append("📊 总体统计")
lines.append("-" * 80)
lines.append(f" 总慢查询数: {report['total']}")
lines.append(f" 慢查询阈值: {report['threshold']}")
lines.append(f" 总操作数: {total_ops}")
if total_ops > 0:
lines.append(f" 慢查询比例: {report['total'] / total_ops * 100:.1f}%")
lines.append("")
# 按操作排名
if report["top_operations"]:
lines.append("📈 按操作排名 (Top 10)")
lines.append("-" * 80)
lines.append(f"{'#':<3} {'操作名':<30} {'次数':<8} {'平均时间':<12} {'最大时间':<12}")
lines.append("-" * 80)
for idx, op in enumerate(report["top_operations"], 1):
lines.append(
f"{idx:<3} {op['operation']:<30} {op['count']:<8} "
f"{op['avg_time']:<12} {op['max_time']:<12}"
)
lines.append("")
# 最近的慢查询
if report["recent_queries"]:
lines.append("⏱️ 最近的慢查询 (最近 20 条)")
lines.append("-" * 80)
lines.append(f"{'时间':<20} {'操作':<30} {'执行时间':<15}")
lines.append("-" * 80)
for record in report["recent_queries"]:
lines.append(
f"{record['timestamp']:<20} {record['operation']:<30} {record['time']:<15}"
)
lines.append("")
# 优化建议
lines.append("💡 优化建议")
lines.append("-" * 80)
suggestions = _get_suggestions(report, metrics)
for suggestion in suggestions:
lines.append(f"{suggestion}")
lines.append("=" * 80)
return "\n".join(lines)
@staticmethod
def get_slow_queries_by_operation(operation_name: str) -> list[Any]:
"""获取特定操作的所有慢查询
Args:
operation_name: 操作名称
Returns:
慢查询记录列表
"""
monitor = get_monitor()
slow_queries = monitor.get_slow_queries()
return [q for q in slow_queries if q.operation_name == operation_name]
@staticmethod
def get_slowest_queries(limit: int = 20) -> list[Any]:
"""获取最慢的查询
Args:
limit: 返回数量
Returns:
按执行时间排序的慢查询记录列表
"""
monitor = get_monitor()
slow_queries = monitor.get_slow_queries()
return sorted(slow_queries, key=lambda q: q.execution_time, reverse=True)[:limit]
def _render_operations_table(report: dict) -> str:
"""渲染操作排名表格"""
if not report["top_operations"]:
return '<div class="empty-state"><p>暂无数据</p></div>'
rows = []
for idx, op in enumerate(report["top_operations"], 1):
rows.append(f"""
<tr>
<td>#{idx}</td>
<td><strong>{op['operation']}</strong></td>
<td><span class="badge badge-warning">{op['count']}</span></td>
<td>{op['avg_time']}</td>
<td>{op['max_time']}</td>
</tr>
""")
return f"""
<table>
<thead>
<tr>
<th style="width: 5%">#</th>
<th style="width: 40%">操作名</th>
<th style="width: 15%">慢查询次数</th>
<th style="width: 20%">平均执行时间</th>
<th style="width: 20%">最大执行时间</th>
</tr>
</thead>
<tbody>
{''.join(rows)}
</tbody>
</table>
"""
def _render_recent_queries_table(report: dict) -> str:
"""渲染最近查询表格"""
if not report["recent_queries"]:
return '<div class="empty-state"><p>暂无数据</p></div>'
rows = []
for record in report["recent_queries"]:
rows.append(f"""
<tr>
<td>{record['timestamp']}</td>
<td>{record['operation']}</td>
<td><span class="badge badge-danger">{record['time']}</span></td>
</tr>
""")
return f"""
<table>
<thead>
<tr>
<th style="width: 25%">时间</th>
<th style="width: 50%">操作名</th>
<th style="width: 25%">执行时间</th>
</tr>
</thead>
<tbody>
{''.join(rows)}
</tbody>
</table>
"""
def _get_suggestions(report: dict, metrics: Any) -> list[str]:
"""生成优化建议"""
suggestions = []
if report["total"] == 0:
suggestions.append("✅ 没有检测到慢查询,性能良好!")
return suggestions
# 计算比例
total_ops = sum(m.count for m in metrics.operations.values())
slow_ratio = report["total"] / total_ops if total_ops > 0 else 0
if slow_ratio > 0.1:
suggestions.append(f"⚠️ 慢查询比例较高 ({slow_ratio * 100:.1f}%),建议检查数据库索引和查询优化")
if report["top_operations"]:
top_op = report["top_operations"][0]
suggestions.append(f"🔍 '{top_op['operation']}' 是最常见的慢查询,建议优先优化这个操作")
if top_op["count"] > total_ops * 0.3:
suggestions.append("🚀 优化最频繁的慢查询可能会显著提升性能")
# 分析操作执行时间
for op_name, op_metrics in metrics.operations.items():
if op_metrics.max_time > 5:
suggestions.append(
f"⏱️ '{op_name}' 的最大执行时间超过 5 秒 ({op_metrics.max_time:.1f}s)"
"这可能表明有异常的查询操作"
)
if len(report["top_operations"]) > 1:
top_2_count = sum(op["count"] for op in report["top_operations"][:2])
if top_2_count / report["total"] > 0.7:
suggestions.append("🎯 80% 的慢查询集中在少数操作上,建议针对这些操作进行优化")
if not suggestions:
suggestions.append("💡 考虑调整 slow_query_threshold 以获得更细致的分析")
return suggestions
def _render_suggestions(report: dict, metrics: Any) -> str:
"""渲染优化建议"""
suggestions = _get_suggestions(report, metrics)
return f"""
<ul style="list-style: none; padding: 0;">
{''.join(f'<li style="padding: 8px 0; line-height: 1.6;">{s}</li>' for s in suggestions)}
</ul>
"""