fix: 修复统计时段异常的问题

This commit is contained in:
Oct-autumn
2025-05-09 02:37:52 +08:00
parent a1fbff1d6b
commit 13db955454

View File

@@ -103,7 +103,7 @@ class StatisticOutputTask(AsyncTask):
def __init__(self, record_file_path: str = "llm_statistics.txt"): def __init__(self, record_file_path: str = "llm_statistics.txt"):
# 延迟300秒启动运行间隔300秒 # 延迟300秒启动运行间隔300秒
super().__init__(task_name="Statistics Data Output Task", wait_before_start=300, run_interval=300) super().__init__(task_name="Statistics Data Output Task", wait_before_start=0, run_interval=300)
self.name_mapping: Dict[str, Tuple[str, float]] = {} self.name_mapping: Dict[str, Tuple[str, float]] = {}
""" """
@@ -117,25 +117,35 @@ class StatisticOutputTask(AsyncTask):
""" """
now = datetime.now() now = datetime.now()
self.stat_period: List[Tuple[str, datetime, str]] = [ if "deploy_time" in local_storage:
("all_time", datetime(2000, 1, 1), "自部署以来的"), # 如果存在部署时间,则使用该时间作为全量统计的起始时间
("last_7_days", now - timedelta(days=7), "最近7天的"), deploy_time = datetime.fromtimestamp(local_storage["deploy_time"])
("last_24_hours", now - timedelta(days=1), "最近24小时的"), else:
("last_hour", now - timedelta(hours=1), "最近1小时的"), # 否则,使用最大时间范围,并记录部署时间为当前时间
deploy_time = datetime(2000, 1, 1)
local_storage["deploy_time"] = now.timestamp()
self.stat_period: List[Tuple[str, timedelta, str]] = [
("all_time", now - deploy_time, "自部署以来的"),
("last_7_days", timedelta(days=7), "最近7天的"),
("last_24_hours", timedelta(days=1), "最近24小时的"),
("last_hour", timedelta(hours=1), "最近1小时的"),
] ]
""" """
统计时间段 统计时间段 [(统计名称, 统计时间段, 统计描述), ...]
""" """
def _statistic_console_output(self, stats: Dict[str, Any]): def _statistic_console_output(self, stats: Dict[str, Any], now: datetime):
""" """
输出统计数据到控制台 输出统计数据到控制台
:param stats: 统计数据
:param now: 基准当前时间
""" """
# 输出最近一小时的统计数据 # 输出最近一小时的统计数据
output = [ output = [
self.SEP_LINE, self.SEP_LINE,
f" 最近1小时的统计数据 (详细信息见文件:{self.record_file_path})", f" 最近1小时的统计数据 ({now.strftime('%Y-%m-%d %H:%M:%S')}开始,详细信息见文件:{self.record_file_path})",
self.SEP_LINE, self.SEP_LINE,
self._format_total_stat(stats["last_hour"]), self._format_total_stat(stats["last_hour"]),
"", "",
@@ -148,11 +158,11 @@ class StatisticOutputTask(AsyncTask):
logger.info("\n" + "\n".join(output)) logger.info("\n" + "\n".join(output))
def _statistic_file_output(self, stats: Dict[str, Any]): def _statistic_file_output(self, stats: Dict[str, Any], now: datetime):
""" """
输出统计数据到文件 输出统计数据到文件
""" """
output = [f"MaiBot运行统计报告 (生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')})", ""] output = [f"MaiBot运行统计报告 (统计截止时间:{now.strftime('%Y-%m-%d %H:%M:%S')})", ""]
def _format_stat_data(title: str, stats_: Dict[str, Any]) -> str: def _format_stat_data(title: str, stats_: Dict[str, Any]) -> str:
""" """
@@ -172,16 +182,21 @@ class StatisticOutputTask(AsyncTask):
self._format_user_classified_stat(stats_), self._format_user_classified_stat(stats_),
"", "",
self._format_chat_stat(stats_), self._format_chat_stat(stats_),
"",
] ]
) )
for period_key, period_start_time, period_desc in self.stat_period: for period_key, period_interval, period_desc in self.stat_period:
if period_key in stats: if period_key in stats:
start_time = (
datetime.fromtimestamp(local_storage["deploy_time"])
if period_key == "all_time"
else now - period_interval
)
# 统计数据存在 # 统计数据存在
output.append( output.append(
_format_stat_data( _format_stat_data(
f"{period_desc}统计数据 (自{period_start_time.strftime('%Y-%m-%d %H:%M:%S')}开始)", f"{period_desc}统计数据 "
f"(统计时段:{start_time.strftime('%Y-%m-%d %H:%M:%S')} ~ {now.strftime('%Y-%m-%d %H:%M:%S')})",
stats[period_key], stats[period_key],
) )
) )
@@ -191,20 +206,21 @@ class StatisticOutputTask(AsyncTask):
async def run(self): async def run(self):
try: try:
now = datetime.now()
# 收集统计数据 # 收集统计数据
stats = self._collect_all_statistics() stats = self._collect_all_statistics(now)
# 输出统计数据到控制台 # 输出统计数据到控制台
self._statistic_console_output(stats) self._statistic_console_output(stats, now)
# 输出统计数据到文件 # 输出统计数据到文件
self._statistic_file_output(stats) self._statistic_file_output(stats, now)
except Exception as e: except Exception as e:
logger.exception(f"输出统计数据过程中发生异常,错误信息:{e}") logger.exception(f"输出统计数据过程中发生异常,错误信息:{e}")
# -- 以下为统计数据收集方法 -- # -- 以下为统计数据收集方法 --
@staticmethod @staticmethod
def _collect_model_request_for_period(collect_period: List[Tuple[str, datetime, str]]) -> Dict[str, Any]: def _collect_model_request_for_period(collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]:
""" """
收集指定时间段的LLM请求统计数据 收集指定时间段的LLM请求统计数据
@@ -243,17 +259,17 @@ class StatisticOutputTask(AsyncTask):
COST_BY_USER: defaultdict(float), COST_BY_USER: defaultdict(float),
COST_BY_MODEL: defaultdict(float), COST_BY_MODEL: defaultdict(float),
} }
for period_key, _, _ in collect_period for period_key, _ in collect_period
} }
# 以最早的时间戳为起始时间获取记录 # 以最早的时间戳为起始时间获取记录
for record in db.llm_usage.find({"timestamp": {"$gte": collect_period[-1][1]}}): for record in db.llm_usage.find({"timestamp": {"$gte": collect_period[-1][1]}}):
record_timestamp = record.get("timestamp") record_timestamp = record.get("timestamp")
for idx, (_, period_start, _) in enumerate(collect_period): for idx, (_, period_start) in enumerate(collect_period):
if record_timestamp >= period_start: if record_timestamp >= period_start:
# 如果记录时间在当前时间段内,则它一定在更早的时间段内 # 如果记录时间在当前时间段内,则它一定在更早的时间段内
# 因此,我们可以直接跳过更早的时间段的判断,直接更新当前以及更早时间段的统计数据 # 因此,我们可以直接跳过更早的时间段的判断,直接更新当前以及更早时间段的统计数据
for period_key, _, _ in collect_period[idx:]: for period_key, _ in collect_period[idx:]:
stats[period_key][TOTAL_REQ_CNT] += 1 stats[period_key][TOTAL_REQ_CNT] += 1
request_type = record.get("request_type", "unknown") # 请求类型 request_type = record.get("request_type", "unknown") # 请求类型
@@ -290,7 +306,7 @@ class StatisticOutputTask(AsyncTask):
return stats return stats
@staticmethod @staticmethod
def _collect_online_time_for_period(collect_period: List[Tuple[str, datetime, str]]) -> Dict[str, Any]: def _collect_online_time_for_period(collect_period: List[Tuple[str, datetime]], now: datetime) -> Dict[str, Any]:
""" """
收集指定时间段的在线时间统计数据 收集指定时间段的在线时间统计数据
@@ -307,17 +323,20 @@ class StatisticOutputTask(AsyncTask):
# 在线时间统计 # 在线时间统计
ONLINE_TIME: 0.0, ONLINE_TIME: 0.0,
} }
for period_key, _, _ in collect_period for period_key, _ in collect_period
} }
# 统计在线时间 # 统计在线时间
for record in db.online_time.find({"end_timestamp": {"$gte": collect_period[-1][1]}}): for record in db.online_time.find({"end_timestamp": {"$gte": collect_period[-1][1]}}):
end_timestamp: datetime = record.get("end_timestamp") end_timestamp: datetime = record.get("end_timestamp")
for idx, (_, period_start, _) in enumerate(collect_period): for idx, (_, period_start) in enumerate(collect_period):
if end_timestamp >= period_start: if end_timestamp >= period_start:
# 由于end_timestamp会超前标记时间所以我们需要判断是否晚于当前时间如果是则使用当前时间作为结束时间
if end_timestamp > now:
end_timestamp = now
# 如果记录时间在当前时间段内,则它一定在更早的时间段内 # 如果记录时间在当前时间段内,则它一定在更早的时间段内
# 因此,我们可以直接跳过更早的时间段的判断,直接更新当前以及更早时间段的统计数据 # 因此,我们可以直接跳过更早的时间段的判断,直接更新当前以及更早时间段的统计数据
for period_key, _period_start, _ in collect_period[idx:]: for period_key, _period_start in collect_period[idx:]:
start_timestamp: datetime = record.get("start_timestamp") start_timestamp: datetime = record.get("start_timestamp")
if start_timestamp < _period_start: if start_timestamp < _period_start:
# 如果开始时间在查询边界之前,则使用开始时间 # 如果开始时间在查询边界之前,则使用开始时间
@@ -329,7 +348,7 @@ class StatisticOutputTask(AsyncTask):
return stats return stats
def _collect_message_count_for_period(self, collect_period: List[Tuple[str, datetime, str]]) -> Dict[str, Any]: def _collect_message_count_for_period(self, collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]:
""" """
收集指定时间段的消息统计数据 收集指定时间段的消息统计数据
@@ -347,7 +366,7 @@ class StatisticOutputTask(AsyncTask):
TOTAL_MSG_CNT: 0, TOTAL_MSG_CNT: 0,
MSG_CNT_BY_CHAT: defaultdict(int), MSG_CNT_BY_CHAT: defaultdict(int),
} }
for period_key, _, _ in collect_period for period_key, _ in collect_period
} }
# 统计消息量 # 统计消息量
@@ -375,42 +394,43 @@ class StatisticOutputTask(AsyncTask):
else: else:
self.name_mapping[chat_id] = (chat_name, message_time) self.name_mapping[chat_id] = (chat_name, message_time)
for idx, (_, period_start, _) in enumerate(collect_period): for idx, (_, period_start) in enumerate(collect_period):
if message_time >= period_start.timestamp(): if message_time >= period_start.timestamp():
# 如果记录时间在当前时间段内,则它一定在更早的时间段内 # 如果记录时间在当前时间段内,则它一定在更早的时间段内
# 因此,我们可以直接跳过更早的时间段的判断,直接更新当前以及更早时间段的统计数据 # 因此,我们可以直接跳过更早的时间段的判断,直接更新当前以及更早时间段的统计数据
for period_key, _, _ in collect_period[idx:]: for period_key, _ in collect_period[idx:]:
stats[period_key][TOTAL_MSG_CNT] += 1 stats[period_key][TOTAL_MSG_CNT] += 1
stats[period_key][MSG_CNT_BY_CHAT][chat_id] += 1 stats[period_key][MSG_CNT_BY_CHAT][chat_id] += 1
break break
return stats return stats
def _collect_all_statistics(self) -> Dict[str, Dict[str, Any]]: def _collect_all_statistics(self, now: datetime) -> Dict[str, Dict[str, Any]]:
""" """
收集各时间段的统计数据 收集各时间段的统计数据
:param now: 基准当前时间
""" """
now = datetime.now()
last_all_time_stat = None last_all_time_stat = None
stat = {period[0]: {} for period in self.stat_period}
if "last_full_statistics_timestamp" in local_storage and "last_full_statistics" in local_storage: if "last_full_statistics_timestamp" in local_storage and "last_full_statistics" in local_storage:
# 若存有上次完整统计的时间戳,则使用该时间戳作为"所有时间"的起始时间,进行增量统计 # 若存有上次完整统计的时间戳,则使用该时间戳作为"所有时间"的起始时间,进行增量统计
last_full_stat_ts: float = local_storage["last_full_statistics_timestamp"] last_full_stat_ts: float = local_storage["last_full_statistics_timestamp"]
last_all_time_stat = local_storage["last_full_statistics"] last_all_time_stat = local_storage["last_full_statistics"]
self.stat_period = [item for item in self.stat_period if item[0] != "all_time"] # 删除"所有时间"的统计时段 self.stat_period = [item for item in self.stat_period if item[0] != "all_time"] # 删除"所有时间"的统计时段
self.stat_period.append(("all_time", datetime.fromtimestamp(last_full_stat_ts), "自部署以来的")) self.stat_period.append(("all_time", now - datetime.fromtimestamp(last_full_stat_ts), "自部署以来的"))
model_req_stat = self._collect_model_request_for_period(self.stat_period) stat_start_timestamp = [(period[0], now - period[1]) for period in self.stat_period]
online_time_stat = self._collect_online_time_for_period(self.stat_period)
message_count_stat = self._collect_message_count_for_period(self.stat_period) stat = {item[0]: {} for item in self.stat_period}
model_req_stat = self._collect_model_request_for_period(stat_start_timestamp)
online_time_stat = self._collect_online_time_for_period(stat_start_timestamp, now)
message_count_stat = self._collect_message_count_for_period(stat_start_timestamp)
# 统计数据合并 # 统计数据合并
# 合并三类统计数据 # 合并三类统计数据
for period_key, _, _ in self.stat_period: for period_key, _ in stat_start_timestamp:
stat[period_key].update(model_req_stat[period_key]) stat[period_key].update(model_req_stat[period_key])
stat[period_key].update(online_time_stat[period_key]) stat[period_key].update(online_time_stat[period_key])
stat[period_key].update(message_count_stat[period_key]) stat[period_key].update(message_count_stat[period_key])