Merge branch 'dev' of https://github.com/MoFox-Studio/MoFox-Core into dev
This commit is contained in:
@@ -3,8 +3,8 @@ from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
from src.common.database.compatibility import db_get, db_query
|
||||
from src.common.database.api.query import QueryBuilder
|
||||
from src.common.database.compatibility import db_get, db_query
|
||||
from src.common.database.core.models import LLMUsage, Messages, OnlineTime
|
||||
from src.common.logger import get_logger
|
||||
from src.manager.async_task_manager import AsyncTask
|
||||
@@ -299,21 +299,21 @@ class StatisticOutputTask(AsyncTask):
|
||||
# 以最早的时间戳为起始时间获取记录
|
||||
# 🔧 内存优化:使用分批查询代替全量加载
|
||||
query_start_time = collect_period[-1][1]
|
||||
|
||||
|
||||
query_builder = (
|
||||
QueryBuilder(LLMUsage)
|
||||
.no_cache()
|
||||
.filter(timestamp__gte=query_start_time)
|
||||
.order_by("-timestamp")
|
||||
)
|
||||
|
||||
|
||||
total_processed = 0
|
||||
async for batch in query_builder.iter_batches(batch_size=STAT_BATCH_SIZE, as_dict=True):
|
||||
for record in batch:
|
||||
if total_processed >= STAT_MAX_RECORDS:
|
||||
logger.warning(f"统计处理记录数达到上限 {STAT_MAX_RECORDS},跳过剩余记录")
|
||||
break
|
||||
|
||||
|
||||
if not isinstance(record, dict):
|
||||
continue
|
||||
|
||||
@@ -384,11 +384,11 @@ class StatisticOutputTask(AsyncTask):
|
||||
total_processed += 1
|
||||
if total_processed % 500 == 0:
|
||||
await StatisticOutputTask._yield_control(total_processed, interval=1)
|
||||
|
||||
|
||||
# 检查是否达到上限
|
||||
if total_processed >= STAT_MAX_RECORDS:
|
||||
break
|
||||
|
||||
|
||||
# 每批处理完后让出控制权
|
||||
await asyncio.sleep(0)
|
||||
# -- 计算派生指标 --
|
||||
@@ -480,7 +480,7 @@ class StatisticOutputTask(AsyncTask):
|
||||
"labels": [item[0] for item in sorted_models],
|
||||
"data": [round(item[1], 4) for item in sorted_models],
|
||||
}
|
||||
|
||||
|
||||
# 1. Token输入输出对比条形图
|
||||
model_names = list(period_stats[REQ_CNT_BY_MODEL].keys())
|
||||
if model_names:
|
||||
@@ -489,7 +489,7 @@ class StatisticOutputTask(AsyncTask):
|
||||
"input_tokens": [period_stats[IN_TOK_BY_MODEL].get(m, 0) for m in model_names],
|
||||
"output_tokens": [period_stats[OUT_TOK_BY_MODEL].get(m, 0) for m in model_names],
|
||||
}
|
||||
|
||||
|
||||
# 2. 响应时间分布散点图数据(限制数据点以提高加载速度)
|
||||
scatter_data = []
|
||||
max_points_per_model = 50 # 每个模型最多50个点
|
||||
@@ -500,7 +500,7 @@ class StatisticOutputTask(AsyncTask):
|
||||
sampled_costs = time_costs[::step][:max_points_per_model]
|
||||
else:
|
||||
sampled_costs = time_costs
|
||||
|
||||
|
||||
for idx, time_cost in enumerate(sampled_costs):
|
||||
scatter_data.append({
|
||||
"model": model_name,
|
||||
@@ -509,7 +509,7 @@ class StatisticOutputTask(AsyncTask):
|
||||
"tokens": period_stats[TOTAL_TOK_BY_MODEL].get(model_name, 0) // len(time_costs) if time_costs else 0
|
||||
})
|
||||
period_stats[SCATTER_CHART_RESPONSE_TIME] = scatter_data
|
||||
|
||||
|
||||
# 3. 模型效率雷达图
|
||||
if model_names:
|
||||
# 取前5个最常用的模型
|
||||
@@ -522,14 +522,14 @@ class StatisticOutputTask(AsyncTask):
|
||||
avg_time = period_stats[AVG_TIME_COST_BY_MODEL].get(model_name, 0)
|
||||
cost_per_ktok = period_stats[COST_PER_KTOK_BY_MODEL].get(model_name, 0)
|
||||
avg_tokens = period_stats[AVG_TOK_BY_MODEL].get(model_name, 0)
|
||||
|
||||
|
||||
# 简单的归一化(反向归一化时间和成本,值越小越好)
|
||||
max_req = max([period_stats[REQ_CNT_BY_MODEL].get(m[0], 1) for m in top_models])
|
||||
max_tps = max([period_stats[TPS_BY_MODEL].get(m[0], 1) for m in top_models])
|
||||
max_time = max([period_stats[AVG_TIME_COST_BY_MODEL].get(m[0], 0.1) for m in top_models])
|
||||
max_cost = max([period_stats[COST_PER_KTOK_BY_MODEL].get(m[0], 0.001) for m in top_models])
|
||||
max_tokens = max([period_stats[AVG_TOK_BY_MODEL].get(m[0], 1) for m in top_models])
|
||||
|
||||
|
||||
radar_data.append({
|
||||
"model": model_name,
|
||||
"metrics": [
|
||||
@@ -544,7 +544,7 @@ class StatisticOutputTask(AsyncTask):
|
||||
"labels": ["请求量", "TPS", "响应速度", "成本效益", "Token容量"],
|
||||
"datasets": radar_data
|
||||
}
|
||||
|
||||
|
||||
# 4. 供应商请求占比环形图
|
||||
provider_requests = period_stats[REQ_CNT_BY_PROVIDER]
|
||||
if provider_requests:
|
||||
@@ -553,7 +553,7 @@ class StatisticOutputTask(AsyncTask):
|
||||
"labels": [item[0] for item in sorted_provider_reqs],
|
||||
"data": [item[1] for item in sorted_provider_reqs],
|
||||
}
|
||||
|
||||
|
||||
# 5. 平均响应时间条形图
|
||||
if model_names:
|
||||
sorted_by_time = sorted(
|
||||
@@ -626,7 +626,7 @@ class StatisticOutputTask(AsyncTask):
|
||||
if overlap_end > overlap_start:
|
||||
stats[period_key][ONLINE_TIME] += (overlap_end - overlap_start).total_seconds()
|
||||
break
|
||||
|
||||
|
||||
# 每批处理完后让出控制权
|
||||
await asyncio.sleep(0)
|
||||
|
||||
@@ -666,7 +666,7 @@ class StatisticOutputTask(AsyncTask):
|
||||
if total_processed >= STAT_MAX_RECORDS:
|
||||
logger.warning(f"消息统计处理记录数达到上限 {STAT_MAX_RECORDS},跳过剩余记录")
|
||||
break
|
||||
|
||||
|
||||
if not isinstance(message, dict):
|
||||
continue
|
||||
message_time_ts = message.get("time") # This is a float timestamp
|
||||
@@ -709,11 +709,11 @@ class StatisticOutputTask(AsyncTask):
|
||||
total_processed += 1
|
||||
if total_processed % 500 == 0:
|
||||
await StatisticOutputTask._yield_control(total_processed, interval=1)
|
||||
|
||||
|
||||
# 检查是否达到上限
|
||||
if total_processed >= STAT_MAX_RECORDS:
|
||||
break
|
||||
|
||||
|
||||
# 每批处理完后让出控制权
|
||||
await asyncio.sleep(0)
|
||||
|
||||
@@ -822,10 +822,10 @@ class StatisticOutputTask(AsyncTask):
|
||||
|
||||
def _compress_time_cost_lists(self, data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""🔧 内存优化:将 TIME_COST_BY_* 的 list 压缩为聚合数据
|
||||
|
||||
|
||||
原始格式: {"model_a": [1.2, 2.3, 3.4, ...]} (可能无限增长)
|
||||
压缩格式: {"model_a": {"sum": 6.9, "count": 3, "sum_sq": 18.29}}
|
||||
|
||||
|
||||
这样合并时只需要累加 sum/count/sum_sq,不会无限增长。
|
||||
avg = sum / count
|
||||
std = sqrt(sum_sq / count - (sum / count)^2)
|
||||
@@ -835,17 +835,17 @@ class StatisticOutputTask(AsyncTask):
|
||||
TIME_COST_BY_TYPE, TIME_COST_BY_USER, TIME_COST_BY_MODEL,
|
||||
TIME_COST_BY_MODULE, TIME_COST_BY_PROVIDER
|
||||
]
|
||||
|
||||
|
||||
result = dict(data) # 浅拷贝
|
||||
|
||||
|
||||
for key in time_cost_keys:
|
||||
if key not in result:
|
||||
continue
|
||||
|
||||
|
||||
original = result[key]
|
||||
if not isinstance(original, dict):
|
||||
continue
|
||||
|
||||
|
||||
compressed = {}
|
||||
for sub_key, values in original.items():
|
||||
if isinstance(values, list):
|
||||
@@ -863,9 +863,9 @@ class StatisticOutputTask(AsyncTask):
|
||||
else:
|
||||
# 未知格式,保留原值
|
||||
compressed[sub_key] = values
|
||||
|
||||
|
||||
result[key] = compressed
|
||||
|
||||
|
||||
return result
|
||||
|
||||
def _convert_defaultdict_to_dict(self, data):
|
||||
@@ -985,7 +985,7 @@ class StatisticOutputTask(AsyncTask):
|
||||
.filter(timestamp__gte=start_time)
|
||||
.order_by("-timestamp")
|
||||
)
|
||||
|
||||
|
||||
async for batch in llm_query_builder.iter_batches(batch_size=STAT_BATCH_SIZE, as_dict=True):
|
||||
for record in batch:
|
||||
if not isinstance(record, dict) or not record.get("timestamp"):
|
||||
@@ -1010,7 +1010,7 @@ class StatisticOutputTask(AsyncTask):
|
||||
if module_name not in cost_by_module:
|
||||
cost_by_module[module_name] = [0.0] * len(time_points)
|
||||
cost_by_module[module_name][idx] += cost
|
||||
|
||||
|
||||
await asyncio.sleep(0)
|
||||
|
||||
# 🔧 内存优化:使用分批查询 Messages
|
||||
@@ -1020,7 +1020,7 @@ class StatisticOutputTask(AsyncTask):
|
||||
.filter(time__gte=start_time.timestamp())
|
||||
.order_by("-time")
|
||||
)
|
||||
|
||||
|
||||
async for batch in msg_query_builder.iter_batches(batch_size=STAT_BATCH_SIZE, as_dict=True):
|
||||
for msg in batch:
|
||||
if not isinstance(msg, dict) or not msg.get("time"):
|
||||
@@ -1040,7 +1040,7 @@ class StatisticOutputTask(AsyncTask):
|
||||
if chat_name not in message_by_chat:
|
||||
message_by_chat[chat_name] = [0] * len(time_points)
|
||||
message_by_chat[chat_name][idx] += 1
|
||||
|
||||
|
||||
await asyncio.sleep(0)
|
||||
|
||||
return {
|
||||
|
||||
Reference in New Issue
Block a user