+
+ 统计时段:
+ {start_time.strftime("%Y-%m-%d %H:%M:%S")} ~ {now.strftime("%Y-%m-%d %H:%M:%S")}
+
+
总在线时间: {_format_online_time(stat_data[ONLINE_TIME])}
+
总消息数: {stat_data[TOTAL_MSG_CNT]}
+
总请求数: {stat_data[TOTAL_REQ_CNT]}
+
总花费: {stat_data[TOTAL_COST]:.4f} ¥
+
+
按模型分类统计
+
+ | 模型名称 | 调用次数 | 输入Token | 输出Token | Token总量 | 累计花费 |
+
+ {model_rows}
+
+
+
+
按模块分类统计
+
+
+ | 模块名称 | 调用次数 | 输入Token | 输出Token | Token总量 | 累计花费 |
+
+
+ {module_rows}
+
+
+
+
按请求类型分类统计
+
+
+ | 请求类型 | 调用次数 | 输入Token | 输出Token | Token总量 | 累计花费 |
+
+
+ {type_rows}
+
+
+
+
聊天消息统计
+
+
+ | 联系人/群组名称 | 消息数量 |
+
+
+ {chat_rows}
+
+
+
+
+
+ """
+
+ tab_content_list = [
+ _format_stat_data(stat[period[0]], period[0], now - period[1])
+ for period in self.stat_period
+ if period[0] != "all_time"
+ ]
+
+ tab_content_list.append(
+ _format_stat_data(stat["all_time"], "all_time", datetime.fromtimestamp(local_storage["deploy_time"])) # type: ignore
+ )
+
+ # 不再添加版本对比内容
+ # 添加图表内容
+ chart_data = self._generate_chart_data(stat)
+ tab_content_list.append(self._generate_chart_tab(chart_data))
+
+ joined_tab_list = "\n".join(tab_list)
+ joined_tab_content = "\n".join(tab_content_list)
+
+ html_template = (
+ """
+
+
+
+
+
数据图表
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ """
+
+
+class AsyncStatisticOutputTask(AsyncTask):
+ """完全异步的统计输出任务 - 更高性能版本"""
+
+ def __init__(self, record_file_path: str = "maibot_statistics.html"):
+ # 延迟0秒启动,运行间隔300秒
+ super().__init__(task_name="Async Statistics Data Output Task", wait_before_start=0, run_interval=300)
+
+ # 直接复用 StatisticOutputTask 的初始化逻辑
+ temp_stat_task = StatisticOutputTask(record_file_path)
+ self.name_mapping = temp_stat_task.name_mapping
+ self.record_file_path = temp_stat_task.record_file_path
+ self.stat_period = temp_stat_task.stat_period
+
+ async def run(self):
+ """完全异步执行统计任务"""
+
+ async def _async_collect_and_output():
+ try:
+ now = datetime.now()
+ loop = asyncio.get_event_loop()
+
+ with concurrent.futures.ThreadPoolExecutor() as executor:
+ logger.info("正在后台收集统计数据...")
+
+ # 数据收集任务
+ collect_task = asyncio.create_task(
+ loop.run_in_executor(executor, self._collect_all_statistics, now) # type: ignore
+ )
+
+ stats = await collect_task
+ logger.info("统计数据收集完成")
+
+ # 创建并发的输出任务
+ output_tasks = [
+ asyncio.create_task(loop.run_in_executor(executor, self._statistic_console_output, stats, now)), # type: ignore
+ asyncio.create_task(loop.run_in_executor(executor, self._generate_html_report, stats, now)), # type: ignore
+ ]
+
+ # 等待所有输出任务完成
+ await asyncio.gather(*output_tasks)
+
+ logger.info("统计数据后台输出完成")
+ except Exception as e:
+ logger.exception(f"后台统计数据输出过程中发生异常:{e}")
+
+ # 创建后台任务,立即返回
+ asyncio.create_task(_async_collect_and_output())
+
+ # 复用 StatisticOutputTask 的所有方法
+ def _collect_all_statistics(self, now: datetime):
+ return StatisticOutputTask._collect_all_statistics(self, now) # type: ignore
+
+ def _statistic_console_output(self, stats: Dict[str, Any], now: datetime):
+ return StatisticOutputTask._statistic_console_output(self, stats, now) # type: ignore
+
+ def _generate_html_report(self, stats: dict[str, Any], now: datetime):
+ return StatisticOutputTask._generate_html_report(self, stats, now) # type: ignore
+
+ # 其他需要的方法也可以类似复用...
+ @staticmethod
+ def _collect_model_request_for_period(collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]:
+ return StatisticOutputTask._collect_model_request_for_period(collect_period)
+
+ @staticmethod
+ def _collect_online_time_for_period(collect_period: List[Tuple[str, datetime]], now: datetime) -> Dict[str, Any]:
+ return StatisticOutputTask._collect_online_time_for_period(collect_period, now)
+
+ def _collect_message_count_for_period(self, collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]:
+ return StatisticOutputTask._collect_message_count_for_period(self, collect_period) # type: ignore
+
+ @staticmethod
+ def _format_total_stat(stats: Dict[str, Any]) -> str:
+ return StatisticOutputTask._format_total_stat(stats)
+
+ @staticmethod
+ def _format_model_classified_stat(stats: Dict[str, Any]) -> str:
+ return StatisticOutputTask._format_model_classified_stat(stats)
+
+ def _format_chat_stat(self, stats: Dict[str, Any]) -> str:
+ return StatisticOutputTask._format_chat_stat(self, stats) # type: ignore
+
+ def _generate_chart_data(self, stat: dict[str, Any]) -> dict:
+ return StatisticOutputTask._generate_chart_data(self, stat) # type: ignore
+
+ def _collect_interval_data(self, now: datetime, hours: int, interval_minutes: int) -> dict:
+ return StatisticOutputTask._collect_interval_data(self, now, hours, interval_minutes) # type: ignore
+
+ def _generate_chart_tab(self, chart_data: dict) -> str:
+ return StatisticOutputTask._generate_chart_tab(self, chart_data) # type: ignore
+
+ def _get_chat_display_name_from_id(self, chat_id: str) -> str:
+ return StatisticOutputTask._get_chat_display_name_from_id(self, chat_id) # type: ignore
+
+ def _convert_defaultdict_to_dict(self, data):
+ return StatisticOutputTask._convert_defaultdict_to_dict(self, data) # type: ignore
diff --git a/src/chat/utils/timer_calculator.py b/src/chat/utils/timer_calculator.py
new file mode 100644
index 000000000..d9479af16
--- /dev/null
+++ b/src/chat/utils/timer_calculator.py
@@ -0,0 +1,158 @@
+import asyncio
+
+from time import perf_counter
+from functools import wraps
+from typing import Optional, Dict, Callable
+from rich.traceback import install
+
+install(extra_lines=3)
+
+"""
+# 更好的计时器
+
+使用形式:
+- 上下文
+- 装饰器
+- 直接实例化
+
+使用场景:
+- 使用Timer:在需要测量代码执行时间时(如性能测试、计时器工具),Timer类是更可靠、高精度的选择。
+- 使用time.time()的场景:当需要记录实际时间点(如日志、时间戳)时使用,但避免用它测量时间间隔。
+
+使用方式:
+
+【装饰器】
+time_dict = {}
+@Timer("计数", time_dict)
+def func():
+ pass
+print(time_dict)
+
+【上下文_1】
+def func():
+ with Timer() as t:
+ pass
+ print(t)
+ print(t.human_readable)
+
+【上下文_2】
+def func():
+ time_dict = {}
+ with Timer("计数", time_dict):
+ pass
+ print(time_dict)
+
+【直接实例化】
+a = Timer()
+print(a) # 直接输出当前 perf_counter 值
+
+参数:
+- name:计时器的名字,默认为 None
+- storage:计时器结果存储字典,默认为 None
+- auto_unit:自动选择单位(毫秒或秒),默认为 True(自动根据时间切换毫秒或秒)
+- do_type_check:是否进行类型检查,默认为 False(不进行类型检查)
+
+属性:human_readable
+
+自定义错误:TimerTypeError
+"""
+
+
+class TimerTypeError(TypeError):
+ """自定义类型错误"""
+
+ __slots__ = ()
+
+ def __init__(self, param, expected_type, actual_type):
+ super().__init__(f"参数 '{param}' 类型错误,期望 {expected_type},实际得到 {actual_type.__name__}")
+
+
+class Timer:
+ """
+ Timer 支持三种模式:
+ 1. 装饰器模式:用于测量函数/协程运行时间
+ 2. 上下文管理器模式:用于 with 语句块内部计时
+ 3. 直接实例化:如果不调用 __enter__,打印对象时将显示当前 perf_counter 的值
+ """
+
+ __slots__ = ("name", "storage", "elapsed", "auto_unit", "start")
+
+ def __init__(
+ self,
+ name: Optional[str] = None,
+ storage: Optional[Dict[str, float]] = None,
+ auto_unit: bool = True,
+ do_type_check: bool = False,
+ ):
+ if do_type_check:
+ self._validate_types(name, storage)
+
+ self.name = name
+ self.storage = storage
+ self.elapsed: float = None # type: ignore
+
+ self.auto_unit = auto_unit
+ self.start: float = None # type: ignore
+
+ @staticmethod
+ def _validate_types(name, storage):
+ """类型检查"""
+ if name is not None and not isinstance(name, str):
+ raise TimerTypeError("name", "Optional[str]", type(name))
+
+ if storage is not None and not isinstance(storage, dict):
+ raise TimerTypeError("storage", "Optional[dict]", type(storage))
+
+ def __call__(self, func: Optional[Callable] = None) -> Callable:
+ """装饰器模式"""
+ if func is None:
+ return lambda f: Timer(name=self.name or f.__name__, storage=self.storage, auto_unit=self.auto_unit)(f)
+
+ @wraps(func)
+ async def async_wrapper(*args, **kwargs):
+ with self:
+ return await func(*args, **kwargs)
+ return None
+
+ @wraps(func)
+ def sync_wrapper(*args, **kwargs):
+ with self:
+ return func(*args, **kwargs)
+ return None
+
+ wrapper = async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
+ wrapper.__timer__ = self # 保留计时器引用 # type: ignore
+ return wrapper
+
+ def __enter__(self):
+ """上下文管理器入口"""
+ self.start = perf_counter()
+ return self
+
+ def __exit__(self, *args):
+ self.elapsed = perf_counter() - self.start
+ self._record_time()
+ return False
+
+ def _record_time(self):
+ """记录时间"""
+ if self.storage is not None and self.name:
+ self.storage[self.name] = self.elapsed
+
+ @property
+ def human_readable(self) -> str:
+ """人类可读时间格式"""
+ if self.elapsed is None:
+ return "未计时"
+
+ if self.auto_unit:
+ return f"{self.elapsed * 1000:.2f}毫秒" if self.elapsed < 1 else f"{self.elapsed:.2f}秒"
+ return f"{self.elapsed:.4f}秒"
+
+ def __str__(self):
+ if self.start is not None:
+ if self.elapsed is None:
+ current_elapsed = perf_counter() - self.start
+ return f"