from collections import defaultdict from datetime import datetime, timedelta from typing import Any, Dict, Tuple, List import asyncio import concurrent.futures from src.common.logger import get_logger from src.manager.async_task_manager import AsyncTask from ...common.database.database import db # This db is the Peewee database instance from ...common.database.database_model import OnlineTime, LLMUsage, Messages # Import the Peewee model from src.manager.local_store_manager import local_storage logger = get_logger("maibot_statistic") # HFC统计相关的键 HFC_TOTAL_CYCLES = "hfc_total_cycles" HFC_CYCLES_BY_CHAT = "hfc_cycles_by_chat" HFC_CYCLES_BY_ACTION = "hfc_cycles_by_action" HFC_CYCLES_BY_VERSION = "hfc_cycles_by_version" HFC_AVG_TIME_BY_CHAT = "hfc_avg_time_by_chat" HFC_AVG_TIME_BY_ACTION = "hfc_avg_time_by_action" HFC_AVG_TIME_BY_VERSION = "hfc_avg_time_by_version" HFC_ACTIONS_BY_CHAT = "hfc_actions_by_chat" # 群聊×动作交叉统计 # 统计数据的键 TOTAL_REQ_CNT = "total_requests" TOTAL_COST = "total_cost" REQ_CNT_BY_TYPE = "requests_by_type" REQ_CNT_BY_USER = "requests_by_user" REQ_CNT_BY_MODEL = "requests_by_model" REQ_CNT_BY_MODULE = "requests_by_module" IN_TOK_BY_TYPE = "in_tokens_by_type" IN_TOK_BY_USER = "in_tokens_by_user" IN_TOK_BY_MODEL = "in_tokens_by_model" IN_TOK_BY_MODULE = "in_tokens_by_module" OUT_TOK_BY_TYPE = "out_tokens_by_type" OUT_TOK_BY_USER = "out_tokens_by_user" OUT_TOK_BY_MODEL = "out_tokens_by_model" OUT_TOK_BY_MODULE = "out_tokens_by_module" TOTAL_TOK_BY_TYPE = "tokens_by_type" TOTAL_TOK_BY_USER = "tokens_by_user" TOTAL_TOK_BY_MODEL = "tokens_by_model" TOTAL_TOK_BY_MODULE = "tokens_by_module" COST_BY_TYPE = "costs_by_type" COST_BY_USER = "costs_by_user" COST_BY_MODEL = "costs_by_model" COST_BY_MODULE = "costs_by_module" ONLINE_TIME = "online_time" TOTAL_MSG_CNT = "total_messages" MSG_CNT_BY_CHAT = "messages_by_chat" class OnlineTimeRecordTask(AsyncTask): """在线时间记录任务""" def __init__(self): super().__init__(task_name="Online Time Record Task", run_interval=60) self.record_id: int | None = None # Changed to int for Peewee's default ID """记录ID""" self._init_database() # 初始化数据库 @staticmethod def _init_database(): """初始化数据库""" with db.atomic(): # Use atomic operations for schema changes OnlineTime.create_table(safe=True) # Creates table if it doesn't exist, Peewee handles indexes from model async def run(self): try: current_time = datetime.now() extended_end_time = current_time + timedelta(minutes=1) if self.record_id: # 如果有记录,则更新结束时间 query = OnlineTime.update(end_timestamp=extended_end_time).where(OnlineTime.id == self.record_id) updated_rows = query.execute() if updated_rows == 0: # Record might have been deleted or ID is stale, try to find/create self.record_id = None # Reset record_id to trigger find/create logic below if not self.record_id: # Check again if record_id was reset or initially None # 如果没有记录,检查一分钟以内是否已有记录 # Look for a record whose end_timestamp is recent enough to be considered ongoing recent_record = ( OnlineTime.select() .where(OnlineTime.end_timestamp >= (current_time - timedelta(minutes=1))) .order_by(OnlineTime.end_timestamp.desc()) .first() ) if recent_record: # 如果有记录,则更新结束时间 self.record_id = recent_record.id recent_record.end_timestamp = extended_end_time recent_record.save() else: # 若没有记录,则插入新的在线时间记录 new_record = OnlineTime.create( timestamp=current_time.timestamp(), # 添加此行 start_timestamp=current_time, end_timestamp=extended_end_time, duration=5, # 初始时长为5分钟 ) self.record_id = new_record.id except Exception as e: logger.error(f"在线时间记录失败,错误信息:{e}") def _format_online_time(online_seconds: int) -> str: """ 格式化在线时间 :param online_seconds: 在线时间(秒) :return: 格式化后的在线时间字符串 """ total_oneline_time = timedelta(seconds=online_seconds) days = total_oneline_time.days hours = total_oneline_time.seconds // 3600 minutes = (total_oneline_time.seconds // 60) % 60 seconds = total_oneline_time.seconds % 60 if days > 0: # 如果在线时间超过1天,则格式化为"X天X小时X分钟" return f"{total_oneline_time.days}天{hours}小时{minutes}分钟{seconds}秒" elif hours > 0: # 如果在线时间超过1小时,则格式化为"X小时X分钟X秒" return f"{hours}小时{minutes}分钟{seconds}秒" else: # 其他情况格式化为"X分钟X秒" return f"{minutes}分钟{seconds}秒" class StatisticOutputTask(AsyncTask): """统计输出任务""" SEP_LINE = "-" * 84 def __init__(self, record_file_path: str = "maibot_statistics.html"): # 延迟300秒启动,运行间隔300秒 super().__init__(task_name="Statistics Data Output Task", wait_before_start=0, run_interval=300) self.name_mapping: Dict[str, Tuple[str, float]] = {} """ 联系人/群聊名称映射 {聊天ID: (联系人/群聊名称, 记录时间(timestamp))} 注:设计记录时间的目的是方便更新名称,使联系人/群聊名称保持最新 """ self.record_file_path: str = record_file_path """ 记录文件路径 """ now = datetime.now() if "deploy_time" in local_storage: # 如果存在部署时间,则使用该时间作为全量统计的起始时间 deploy_time = datetime.fromtimestamp(local_storage["deploy_time"]) else: # 否则,使用最大时间范围,并记录部署时间为当前时间 deploy_time = datetime(2000, 1, 1) local_storage["deploy_time"] = now.timestamp() self.stat_period: List[Tuple[str, timedelta, str]] = [ ("all_time", now - deploy_time, "自部署以来"), # 必须保留"all_time" ("last_7_days", timedelta(days=7), "最近7天"), ("last_24_hours", timedelta(days=1), "最近24小时"), ("last_3_hours", timedelta(hours=3), "最近3小时"), ("last_hour", timedelta(hours=1), "最近1小时"), ] """ 统计时间段 [(统计名称, 统计时间段, 统计描述), ...] """ def _statistic_console_output(self, stats: Dict[str, Any], now: datetime): """ 输出统计数据到控制台 :param stats: 统计数据 :param now: 基准当前时间 """ # 输出最近一小时的统计数据 output = [ self.SEP_LINE, f" 最近1小时的统计数据 (自{now.strftime('%Y-%m-%d %H:%M:%S')}开始,详细信息见文件:{self.record_file_path})", self.SEP_LINE, self._format_total_stat(stats["last_hour"]), "", self._format_model_classified_stat(stats["last_hour"]), "", self._format_chat_stat(stats["last_hour"]), self.SEP_LINE, "", ] logger.info("\n" + "\n".join(output)) async def run(self): try: now = datetime.now() # 使用线程池并行执行耗时操作 loop = asyncio.get_event_loop() # 在线程池中并行执行数据收集和之前的HTML生成(如果存在) with concurrent.futures.ThreadPoolExecutor() as executor: logger.info("正在收集统计数据...") # 数据收集任务 collect_task = loop.run_in_executor(executor, self._collect_all_statistics, now) # 等待数据收集完成 stats = await collect_task logger.info("统计数据收集完成") # 并行执行控制台输出和HTML报告生成 console_task = loop.run_in_executor(executor, self._statistic_console_output, stats, now) html_task = loop.run_in_executor(executor, self._generate_html_report, stats, now) # 等待两个输出任务完成 await asyncio.gather(console_task, html_task) logger.info("统计数据输出完成") except Exception as e: logger.exception(f"输出统计数据过程中发生异常,错误信息:{e}") async def run_async_background(self): """ 备选方案:完全异步后台运行统计输出 使用此方法可以让统计任务完全非阻塞 """ async def _async_collect_and_output(): try: import concurrent.futures now = datetime.now() loop = asyncio.get_event_loop() with concurrent.futures.ThreadPoolExecutor() as executor: logger.info("正在后台收集统计数据...") # 创建后台任务,不等待完成 collect_task = asyncio.create_task( loop.run_in_executor(executor, self._collect_all_statistics, now) ) stats = await collect_task logger.info("统计数据收集完成") # 创建并发的输出任务 output_tasks = [ asyncio.create_task(loop.run_in_executor(executor, self._statistic_console_output, stats, now)), asyncio.create_task(loop.run_in_executor(executor, self._generate_html_report, stats, now)), ] # 等待所有输出任务完成 await asyncio.gather(*output_tasks) logger.info("统计数据后台输出完成") except Exception as e: logger.exception(f"后台统计数据输出过程中发生异常:{e}") # 创建后台任务,立即返回 asyncio.create_task(_async_collect_and_output()) # -- 以下为统计数据收集方法 -- @staticmethod def _collect_model_request_for_period(collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]: """ 收集指定时间段的LLM请求统计数据 :param collect_period: 统计时间段 """ if not collect_period: return {} # 排序-按照时间段开始时间降序排列(最晚的时间段在前) collect_period.sort(key=lambda x: x[1], reverse=True) stats = { period_key: { TOTAL_REQ_CNT: 0, REQ_CNT_BY_TYPE: defaultdict(int), REQ_CNT_BY_USER: defaultdict(int), REQ_CNT_BY_MODEL: defaultdict(int), REQ_CNT_BY_MODULE: defaultdict(int), IN_TOK_BY_TYPE: defaultdict(int), IN_TOK_BY_USER: defaultdict(int), IN_TOK_BY_MODEL: defaultdict(int), IN_TOK_BY_MODULE: defaultdict(int), OUT_TOK_BY_TYPE: defaultdict(int), OUT_TOK_BY_USER: defaultdict(int), OUT_TOK_BY_MODEL: defaultdict(int), OUT_TOK_BY_MODULE: defaultdict(int), TOTAL_TOK_BY_TYPE: defaultdict(int), TOTAL_TOK_BY_USER: defaultdict(int), TOTAL_TOK_BY_MODEL: defaultdict(int), TOTAL_TOK_BY_MODULE: defaultdict(int), TOTAL_COST: 0.0, COST_BY_TYPE: defaultdict(float), COST_BY_USER: defaultdict(float), COST_BY_MODEL: defaultdict(float), COST_BY_MODULE: defaultdict(float), } for period_key, _ in collect_period } # 以最早的时间戳为起始时间获取记录 # Assuming LLMUsage.timestamp is a DateTimeField query_start_time = collect_period[-1][1] for record in LLMUsage.select().where(LLMUsage.timestamp >= query_start_time): record_timestamp = record.timestamp # This is already a datetime object for idx, (_, period_start) in enumerate(collect_period): if record_timestamp >= period_start: for period_key, _ in collect_period[idx:]: stats[period_key][TOTAL_REQ_CNT] += 1 request_type = record.request_type or "unknown" user_id = record.user_id or "unknown" # user_id is TextField, already string model_name = record.model_name or "unknown" # 提取模块名:如果请求类型包含".",取第一个"."之前的部分 module_name = request_type.split(".")[0] if "." in request_type else request_type stats[period_key][REQ_CNT_BY_TYPE][request_type] += 1 stats[period_key][REQ_CNT_BY_USER][user_id] += 1 stats[period_key][REQ_CNT_BY_MODEL][model_name] += 1 stats[period_key][REQ_CNT_BY_MODULE][module_name] += 1 prompt_tokens = record.prompt_tokens or 0 completion_tokens = record.completion_tokens or 0 total_tokens = prompt_tokens + completion_tokens stats[period_key][IN_TOK_BY_TYPE][request_type] += prompt_tokens stats[period_key][IN_TOK_BY_USER][user_id] += prompt_tokens stats[period_key][IN_TOK_BY_MODEL][model_name] += prompt_tokens stats[period_key][IN_TOK_BY_MODULE][module_name] += prompt_tokens stats[period_key][OUT_TOK_BY_TYPE][request_type] += completion_tokens stats[period_key][OUT_TOK_BY_USER][user_id] += completion_tokens stats[period_key][OUT_TOK_BY_MODEL][model_name] += completion_tokens stats[period_key][OUT_TOK_BY_MODULE][module_name] += completion_tokens stats[period_key][TOTAL_TOK_BY_TYPE][request_type] += total_tokens stats[period_key][TOTAL_TOK_BY_USER][user_id] += total_tokens stats[period_key][TOTAL_TOK_BY_MODEL][model_name] += total_tokens stats[period_key][TOTAL_TOK_BY_MODULE][module_name] += total_tokens cost = record.cost or 0.0 stats[period_key][TOTAL_COST] += cost stats[period_key][COST_BY_TYPE][request_type] += cost stats[period_key][COST_BY_USER][user_id] += cost stats[period_key][COST_BY_MODEL][model_name] += cost stats[period_key][COST_BY_MODULE][module_name] += cost break return stats @staticmethod def _collect_online_time_for_period(collect_period: List[Tuple[str, datetime]], now: datetime) -> Dict[str, Any]: """ 收集指定时间段的在线时间统计数据 :param collect_period: 统计时间段 """ if not collect_period: return {} collect_period.sort(key=lambda x: x[1], reverse=True) stats = { period_key: { ONLINE_TIME: 0.0, } for period_key, _ in collect_period } query_start_time = collect_period[-1][1] # Assuming OnlineTime.end_timestamp is a DateTimeField for record in OnlineTime.select().where(OnlineTime.end_timestamp >= query_start_time): # record.end_timestamp and record.start_timestamp are datetime objects record_end_timestamp = record.end_timestamp record_start_timestamp = record.start_timestamp for idx, (_, period_boundary_start) in enumerate(collect_period): if record_end_timestamp >= period_boundary_start: # Calculate effective end time for this record in relation to 'now' effective_end_time = min(record_end_timestamp, now) for period_key, current_period_start_time in collect_period[idx:]: # Determine the portion of the record that falls within this specific statistical period overlap_start = max(record_start_timestamp, current_period_start_time) overlap_end = effective_end_time # Already capped by 'now' and record's own end if overlap_end > overlap_start: stats[period_key][ONLINE_TIME] += (overlap_end - overlap_start).total_seconds() break return stats def _collect_message_count_for_period(self, collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]: """ 收集指定时间段的消息统计数据 :param collect_period: 统计时间段 """ if not collect_period: return {} collect_period.sort(key=lambda x: x[1], reverse=True) stats = { period_key: { TOTAL_MSG_CNT: 0, MSG_CNT_BY_CHAT: defaultdict(int), } for period_key, _ in collect_period } query_start_timestamp = collect_period[-1][1].timestamp() # Messages.time is a DoubleField (timestamp) for message in Messages.select().where(Messages.time >= query_start_timestamp): message_time_ts = message.time # This is a float timestamp chat_id = None chat_name = None # Logic based on Peewee model structure, aiming to replicate original intent if message.chat_info_group_id: chat_id = f"g{message.chat_info_group_id}" chat_name = message.chat_info_group_name or f"群{message.chat_info_group_id}" elif message.user_id: # Fallback to sender's info for chat_id if not a group_info based chat # This uses the message SENDER's ID as per original logic's fallback chat_id = f"u{message.user_id}" # SENDER's user_id chat_name = message.user_nickname # SENDER's nickname else: # If neither group_id nor sender_id is available for chat identification logger.warning( f"Message (PK: {message.id if hasattr(message, 'id') else 'N/A'}) lacks group_id and user_id for chat stats." ) continue if not chat_id: # Should not happen if above logic is correct continue # Update name_mapping if chat_id in self.name_mapping: if chat_name != self.name_mapping[chat_id][0] and message_time_ts > self.name_mapping[chat_id][1]: self.name_mapping[chat_id] = (chat_name, message_time_ts) else: self.name_mapping[chat_id] = (chat_name, message_time_ts) for idx, (_, period_start_dt) in enumerate(collect_period): if message_time_ts >= period_start_dt.timestamp(): for period_key, _ in collect_period[idx:]: stats[period_key][TOTAL_MSG_CNT] += 1 stats[period_key][MSG_CNT_BY_CHAT][chat_id] += 1 break return stats def _collect_hfc_data_for_period(self, collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]: """ 收集指定时间段的HFC统计数据 :param collect_period: 统计时间段 """ if not collect_period: return {} # 为每个时间段初始化空的统计数据 stats = { period_key: { HFC_TOTAL_CYCLES: 0, HFC_CYCLES_BY_CHAT: defaultdict(int), HFC_CYCLES_BY_ACTION: defaultdict(int), HFC_CYCLES_BY_VERSION: defaultdict(int), HFC_AVG_TIME_BY_CHAT: defaultdict(lambda: {"decision": 0, "action": 0, "total": 0}), HFC_AVG_TIME_BY_ACTION: defaultdict(lambda: {"decision": 0, "action": 0, "total": 0}), HFC_AVG_TIME_BY_VERSION: defaultdict(lambda: {"decision": 0, "action": 0, "total": 0}), HFC_ACTIONS_BY_CHAT: defaultdict(lambda: defaultdict(int)), # 群聊×动作交叉统计 } for period_key, _ in collect_period } try: import json from pathlib import Path hfc_stats_file = Path("data/hfc/time.json") if not hfc_stats_file.exists(): logger.info("HFC统计文件不存在,跳过HFC统计") return stats # 读取HFC统计数据 with open(hfc_stats_file, 'r', encoding='utf-8') as f: hfc_data = json.load(f) # 处理每个chat_id和版本的统计数据 for stats_key, chat_stats in hfc_data.items(): chat_id = chat_stats.get("chat_id", "unknown") version = chat_stats.get("version", "unknown") last_updated_str = chat_stats.get("last_updated") if not last_updated_str: continue # 解析最后更新时间 try: last_updated = datetime.fromisoformat(last_updated_str.replace('Z', '+00:00')) if last_updated.tzinfo: last_updated = last_updated.replace(tzinfo=None) except: continue # 对于"全部时间",所有数据都包含 # 对于其他时间段,只包含在时间范围内更新的数据 applicable_periods = [] for period_key, period_start in collect_period: if period_key == "all_time" or last_updated >= period_start: applicable_periods.append(period_key) if not applicable_periods: continue # 处理整体统计 overall = chat_stats.get("overall", {}) total_records = overall.get("total_records", 0) avg_step_times = overall.get("avg_step_times", {}) # 计算决策时间和动作时间 action_time = avg_step_times.get("执行动作", 0) total_time = overall.get("avg_total_time", 0) decision_time = max(0, total_time - action_time) for period_key in applicable_periods: stats[period_key][HFC_TOTAL_CYCLES] += total_records stats[period_key][HFC_CYCLES_BY_CHAT][chat_id] += total_records stats[period_key][HFC_CYCLES_BY_VERSION][version] += total_records # 处理按动作类型的统计 by_action = chat_stats.get("by_action", {}) for action_type, action_data in by_action.items(): count = action_data.get("count", 0) action_step_times = action_data.get("avg_step_times", {}) action_total_time = action_data.get("avg_total_time", 0) # 计算该动作类型的决策时间和动作时间 action_exec_time = action_step_times.get("执行动作", 0) action_decision_time = max(0, action_total_time - action_exec_time) for period_key in applicable_periods: stats[period_key][HFC_CYCLES_BY_ACTION][action_type] += count # 群聊×动作交叉统计 stats[period_key][HFC_ACTIONS_BY_CHAT][chat_id][action_type] += count # 累加时间统计(用于后续计算加权平均) # 这里我们需要重新设计数据结构来存储累计值 if chat_id not in stats[period_key][HFC_AVG_TIME_BY_CHAT]: stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id] = {"decision": 0, "action": 0, "total": 0, "count": 0} if action_type not in stats[period_key][HFC_AVG_TIME_BY_ACTION]: stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type] = {"decision": 0, "action": 0, "total": 0, "count": 0} if version not in stats[period_key][HFC_AVG_TIME_BY_VERSION]: stats[period_key][HFC_AVG_TIME_BY_VERSION][version] = {"decision": 0, "action": 0, "total": 0, "count": 0} # 累加加权值(时间*数量) stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id]["decision"] += decision_time * total_records stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id]["action"] += action_time * total_records stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id]["total"] += total_time * total_records stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id]["count"] += total_records stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type]["decision"] += action_decision_time * count stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type]["action"] += action_exec_time * count stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type]["total"] += action_total_time * count stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type]["count"] += count stats[period_key][HFC_AVG_TIME_BY_VERSION][version]["decision"] += decision_time * total_records stats[period_key][HFC_AVG_TIME_BY_VERSION][version]["action"] += action_time * total_records stats[period_key][HFC_AVG_TIME_BY_VERSION][version]["total"] += total_time * total_records stats[period_key][HFC_AVG_TIME_BY_VERSION][version]["count"] += total_records except Exception as e: logger.error(f"收集HFC统计数据失败: {e}") # 计算加权平均时间 for period_key in stats: for stat_type in [HFC_AVG_TIME_BY_CHAT, HFC_AVG_TIME_BY_ACTION, HFC_AVG_TIME_BY_VERSION]: for key, time_data in stats[period_key][stat_type].items(): if time_data.get("count", 0) > 0: count = time_data["count"] stats[period_key][stat_type][key] = { "decision": time_data["decision"] / count, "action": time_data["action"] / count, "total": time_data["total"] / count } else: stats[period_key][stat_type][key] = {"decision": 0, "action": 0, "total": 0} return stats def _collect_all_statistics(self, now: datetime) -> Dict[str, Dict[str, Any]]: """ 收集各时间段的统计数据 :param now: 基准当前时间 """ last_all_time_stat = None if "last_full_statistics" in local_storage: # 如果存在上次完整统计数据,则使用该数据进行增量统计 last_stat = local_storage["last_full_statistics"] # 上次完整统计数据 self.name_mapping = last_stat["name_mapping"] # 上次完整统计数据的名称映射 last_all_time_stat = last_stat["stat_data"] # 上次完整统计的统计数据 last_stat_timestamp = datetime.fromtimestamp(last_stat["timestamp"]) # 上次完整统计数据的时间戳 self.stat_period = [item for item in self.stat_period if item[0] != "all_time"] # 删除"所有时间"的统计时段 self.stat_period.append(("all_time", now - last_stat_timestamp, "自部署以来的")) stat_start_timestamp = [(period[0], now - period[1]) for period in self.stat_period] stat = {item[0]: {} for item in self.stat_period} model_req_stat = self._collect_model_request_for_period(stat_start_timestamp) online_time_stat = self._collect_online_time_for_period(stat_start_timestamp, now) message_count_stat = self._collect_message_count_for_period(stat_start_timestamp) # HFC统计数据收集 hfc_stat = self._collect_hfc_data_for_period(stat_start_timestamp) # 统计数据合并 # 合并四类统计数据 for period_key, _ in stat_start_timestamp: stat[period_key].update(model_req_stat[period_key]) stat[period_key].update(online_time_stat[period_key]) stat[period_key].update(message_count_stat[period_key]) stat[period_key].update(hfc_stat[period_key]) if last_all_time_stat: # 若存在上次完整统计数据,则将其与当前统计数据合并 for key, val in last_all_time_stat.items(): # 跳过已删除的SUCCESS_RATE相关key if key in ["hfc_success_rate_by_chat", "hfc_success_rate_by_action", "hfc_success_rate_by_version"]: continue # 确保当前统计数据中存在该key if key not in stat["all_time"]: continue if isinstance(val, dict): # 是字典类型,则进行合并 for sub_key, sub_val in val.items(): # 检查是否是HFC的嵌套字典时间数据 if key in [HFC_AVG_TIME_BY_CHAT, HFC_AVG_TIME_BY_ACTION, HFC_AVG_TIME_BY_VERSION] and isinstance(sub_val, dict): # 对于HFC时间数据,需要特殊处理 if sub_key not in stat["all_time"][key]: stat["all_time"][key][sub_key] = {"decision": 0, "action": 0, "total": 0} # 合并嵌套的时间数据 for time_type, time_val in sub_val.items(): if time_type in stat["all_time"][key][sub_key]: stat["all_time"][key][sub_key][time_type] += time_val elif key == HFC_ACTIONS_BY_CHAT and isinstance(sub_val, dict): # 对于群聊×动作交叉统计的二层嵌套字典,需要特殊处理 if sub_key not in stat["all_time"][key]: stat["all_time"][key][sub_key] = {} # 合并二层嵌套的动作数据 for action_type, action_count in sub_val.items(): if action_type in stat["all_time"][key][sub_key]: stat["all_time"][key][sub_key][action_type] += action_count else: stat["all_time"][key][sub_key][action_type] = action_count else: # 普通的数值或字典合并 if sub_key in stat["all_time"][key]: stat["all_time"][key][sub_key] += sub_val else: stat["all_time"][key][sub_key] = sub_val else: # 直接合并 stat["all_time"][key] += val # 更新上次完整统计数据的时间戳 local_storage["last_full_statistics"] = { "name_mapping": self.name_mapping, "stat_data": stat["all_time"], "timestamp": now.timestamp(), } return stat # -- 以下为统计数据格式化方法 -- @staticmethod def _format_total_stat(stats: Dict[str, Any]) -> str: """ 格式化总统计数据 """ output = [ f"总在线时间: {_format_online_time(stats[ONLINE_TIME])}", f"总消息数: {stats[TOTAL_MSG_CNT]}", f"总请求数: {stats[TOTAL_REQ_CNT]}", f"总花费: {stats[TOTAL_COST]:.4f}¥", "", ] return "\n".join(output) @staticmethod def _format_model_classified_stat(stats: Dict[str, Any]) -> str: """ 格式化按模型分类的统计数据 """ if stats[TOTAL_REQ_CNT] <= 0: return "" data_fmt = "{:<32} {:>10} {:>12} {:>12} {:>12} {:>9.4f}¥" output = [ "按模型分类统计:", " 模型名称 调用次数 输入Token 输出Token Token总量 累计花费", ] for model_name, count in sorted(stats[REQ_CNT_BY_MODEL].items()): name = f"{model_name[:29]}..." if len(model_name) > 32 else model_name in_tokens = stats[IN_TOK_BY_MODEL][model_name] out_tokens = stats[OUT_TOK_BY_MODEL][model_name] tokens = stats[TOTAL_TOK_BY_MODEL][model_name] cost = stats[COST_BY_MODEL][model_name] output.append(data_fmt.format(name, count, in_tokens, out_tokens, tokens, cost)) output.append("") return "\n".join(output) def _format_chat_stat(self, stats: Dict[str, Any]) -> str: """ 格式化聊天统计数据 """ if stats[TOTAL_MSG_CNT] <= 0: return "" output = ["聊天消息统计:", " 联系人/群组名称 消息数量"] output.extend( f"{self.name_mapping[chat_id][0][:32]:<32} {count:>10}" for chat_id, count in sorted(stats[MSG_CNT_BY_CHAT].items()) ) output.append("") return "\n".join(output) def _generate_html_report(self, stat: dict[str, Any], now: datetime): """ 生成HTML格式的统计报告 :param stat: 统计数据 :param now: 基准当前时间 :return: HTML格式的统计报告 """ tab_list = [ f'' for period in self.stat_period ] # 添加图表选项卡 tab_list.append('') # 添加HFC统计选项卡 tab_list.append('') def _format_stat_data(stat_data: dict[str, Any], div_id: str, start_time: datetime) -> str: """ 格式化一个时间段的统计数据到html div块 :param stat_data: 统计数据 :param div_id: div的ID :param start_time: 统计时间段开始时间 """ # format总在线时间 # 按模型分类统计 model_rows = "\n".join( [ f"
统计时段: {start_time.strftime("%Y-%m-%d %H:%M:%S")} ~ {now.strftime("%Y-%m-%d %H:%M:%S")}
总在线时间: {_format_online_time(stat_data[ONLINE_TIME])}
总消息数: {stat_data[TOTAL_MSG_CNT]}
总请求数: {stat_data[TOTAL_REQ_CNT]}
总花费: {stat_data[TOTAL_COST]:.4f} ¥
| 模型名称 | 调用次数 | 输入Token | 输出Token | Token总量 | 累计花费 |
|---|
| 模块名称 | 调用次数 | 输入Token | 输出Token | Token总量 | 累计花费 |
|---|
| 请求类型 | 调用次数 | 输入Token | 输出Token | Token总量 | 累计花费 |
|---|
| 联系人/群组名称 | 消息数量 |
|---|
统计截止时间: {now.strftime("%Y-%m-%d %H:%M:%S")}
暂无HFC数据
" def _generate_chat_action_table(actions_by_chat): """生成群聊×动作选择率表格""" if not actions_by_chat: return "暂无数据
" # 获取所有动作类型 all_actions = set() for chat_actions in actions_by_chat.values(): all_actions.update(chat_actions.keys()) if not all_actions: return "暂无数据
" all_actions = sorted(all_actions) # 生成表头 action_headers = "" for action in all_actions: action_display = action if action == "no_reply": action_display = "不回复" action_headers += f"| 群聊名称 | {action_headers}总计 |
|---|
说明:显示每个群聊中不同动作类型的选择次数及占比。
""" cycles_by_chat = data.get(HFC_CYCLES_BY_CHAT, {}) cycles_by_action = data.get(HFC_CYCLES_BY_ACTION, {}) cycles_by_version = data.get(HFC_CYCLES_BY_VERSION, {}) avg_time_by_chat = data.get(HFC_AVG_TIME_BY_CHAT, {}) avg_time_by_action = data.get(HFC_AVG_TIME_BY_ACTION, {}) avg_time_by_version = data.get(HFC_AVG_TIME_BY_VERSION, {}) actions_by_chat = data.get(HFC_ACTIONS_BY_CHAT, {}) # 按群聊统计表格 chat_rows = "" for chat_id in sorted(cycles_by_chat.keys()): cycles = cycles_by_chat[chat_id] time_data = avg_time_by_chat.get(chat_id, {"decision": 0, "action": 0, "total": 0}) decision_time = time_data.get("decision", 0) action_time = time_data.get("action", 0) total_time = time_data.get("total", 0) chat_display_name = _get_chat_display_name(chat_id) chat_rows += f"""| 群聊名称 | 循环次数 | 决策时间 | 动作时间 | 总时间 |
|---|
| 动作类型 | 循环次数 | 决策时间 | 动作时间 | 总时间 |
|---|
时间说明:决策时间包括观察、处理、规划等步骤;动作时间是执行具体动作的时间。
| 版本 | 循环次数 | 决策时间 | 动作时间 | 总时间 |
|---|
系统中还没有HFC循环记录
") sections_html = "说明:此页面显示HFC模块的性能统计信息,包括各群聊、动作类型和版本的详细数据。
{sections_html}