From acc41a6f21dffab4f80a465a94b0bd82a81c8295 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sun, 22 Jun 2025 22:26:04 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E9=87=8D=E6=9E=84HFC=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/focus_chat/hfc_performance_logger.py | 240 +-- src/chat/utils/statistic.py | 1449 ++++++++++++----- src/main.py | 8 + 3 files changed, 1111 insertions(+), 586 deletions(-) diff --git a/src/chat/focus_chat/hfc_performance_logger.py b/src/chat/focus_chat/hfc_performance_logger.py index 721830553..a540c1d76 100644 --- a/src/chat/focus_chat/hfc_performance_logger.py +++ b/src/chat/focus_chat/hfc_performance_logger.py @@ -17,12 +17,10 @@ class HFCPerformanceLogger: self.chat_id = chat_id self.version = version or self.INTERNAL_VERSION self.log_dir = Path("log/hfc_loop") - self.data_dir = Path("data/hfc") self.session_start_time = datetime.now() # 确保目录存在 self.log_dir.mkdir(parents=True, exist_ok=True) - self.data_dir.mkdir(parents=True, exist_ok=True) # 当前会话的日志文件,包含版本号 version_suffix = self.version.replace(".", "_") @@ -31,11 +29,9 @@ class HFCPerformanceLogger: ) self.current_session_data = [] - # 统计数据文件 - self.stats_file = self.data_dir / "time.json" - # 初始化时计算历史统计数据 - self._update_historical_stats() + + def record_cycle(self, cycle_data: Dict[str, Any]): """记录单次循环数据""" @@ -74,165 +70,95 @@ class HFCPerformanceLogger: except Exception as e: logger.error(f"写入会话数据失败: {e}") - def _update_historical_stats(self): - """更新历史统计数据""" - try: - # 读取所有历史会话文件 - all_records = [] - - # 读取当前chat_id的所有历史文件(包括不同版本) - for file_path in self.log_dir.glob(f"{self.chat_id}_*.json"): - if file_path == self.session_file: - continue # 跳过当前会话文件 - - try: - with open(file_path, "r", encoding="utf-8") as f: - records = json.load(f) - if isinstance(records, list): - all_records.extend(records) - except Exception as e: - logger.warning(f"读取历史文件 {file_path} 失败: {e}") - - if not all_records: - logger.info(f"没有找到 chat_id={self.chat_id} 的历史数据") - return - - # 计算统计数据 - stats = self._calculate_stats(all_records) - - # 更新统计文件 - self._update_stats_file(stats) - - logger.info(f"更新了 chat_id={self.chat_id} 的历史统计数据,共 {len(all_records)} 条记录") - - except Exception as e: - logger.error(f"更新历史统计数据失败: {e}") - - def _calculate_stats(self, records: List[Dict[str, Any]]) -> Dict[str, Any]: - """计算统计数据""" - if not records: + def get_current_session_stats(self) -> Dict[str, Any]: + """获取当前会话的基本信息""" + if not self.current_session_data: return {} - # 按动作类型分组 - action_groups = {} - total_times = [] - step_time_totals = {} - - for record in records: - action_type = record.get("action_type", "unknown") - total_time = record.get("total_time", 0) - step_times = record.get("step_times", {}) - - if action_type not in action_groups: - action_groups[action_type] = {"count": 0, "total_times": [], "step_times": {}} - - action_groups[action_type]["count"] += 1 - action_groups[action_type]["total_times"].append(total_time) - total_times.append(total_time) - - # 记录步骤时间 - for step_name, step_time in step_times.items(): - if step_name not in action_groups[action_type]["step_times"]: - action_groups[action_type]["step_times"][step_name] = [] - action_groups[action_type]["step_times"][step_name].append(step_time) - - if step_name not in step_time_totals: - step_time_totals[step_name] = [] - step_time_totals[step_name].append(step_time) - - # 计算各种平均值和比例 - total_records = len(records) - - # 整体统计 - overall_stats = { - "total_records": total_records, - "avg_total_time": sum(total_times) / len(total_times) if total_times else 0, - "avg_step_times": {}, - } - - # 各步骤平均时间 - for step_name, times in step_time_totals.items(): - overall_stats["avg_step_times"][step_name] = sum(times) / len(times) if times else 0 - - # 按动作类型统计 - action_stats = {} - for action_type, data in action_groups.items(): - action_stats[action_type] = { - "count": data["count"], - "percentage": (data["count"] / total_records) * 100, - "avg_total_time": sum(data["total_times"]) / len(data["total_times"]) if data["total_times"] else 0, - "avg_step_times": {}, - } - - # 该动作各步骤平均时间 - for step_name, times in data["step_times"].items(): - action_stats[action_type]["avg_step_times"][step_name] = sum(times) / len(times) if times else 0 - return { "chat_id": self.chat_id, "version": self.version, - "last_updated": datetime.now().isoformat(), - "overall": overall_stats, - "by_action": action_stats, + "session_file": str(self.session_file), + "record_count": len(self.current_session_data), + "start_time": self.session_start_time.isoformat() } - def _update_stats_file(self, new_stats: Dict[str, Any]): - """更新统计文件""" - try: - # 读取现有统计数据 - existing_stats = {} - if self.stats_file.exists(): - with open(self.stats_file, "r", encoding="utf-8") as f: - existing_stats = json.load(f) - - # 更新当前chat_id和版本的统计数据 - stats_key = f"{self.chat_id}_{self.version}" - existing_stats[stats_key] = new_stats - - # 写回文件 - with open(self.stats_file, "w", encoding="utf-8") as f: - json.dump(existing_stats, f, ensure_ascii=False, indent=2) - - except Exception as e: - logger.error(f"更新统计文件失败: {e}") - - def get_current_session_stats(self) -> Dict[str, Any]: - """获取当前会话的统计数据""" - if not self.current_session_data: - return {} - - return self._calculate_stats(self.current_session_data) - def finalize_session(self): - """结束会话,进行最终统计""" + """结束会话""" try: if self.current_session_data: - # 计算当前会话统计数据 - self._calculate_stats(self.current_session_data) - - # 合并历史数据重新计算总体统计 - all_records = self.current_session_data[:] - - # 读取历史数据 - for file_path in self.log_dir.glob(f"{self.chat_id}_*.json"): - if file_path == self.session_file: - continue - - try: - with open(file_path, "r", encoding="utf-8") as f: - records = json.load(f) - if isinstance(records, list): - all_records.extend(records) - except Exception as e: - logger.warning(f"读取历史文件 {file_path} 失败: {e}") - - # 重新计算总体统计 - total_stats = self._calculate_stats(all_records) - self._update_stats_file(total_stats) - - logger.info( - f"完成会话统计,当前会话 {len(self.current_session_data)} 条记录,总共 {len(all_records)} 条记录" - ) - + logger.info(f"完成会话,当前会话 {len(self.current_session_data)} 条记录") except Exception as e: - logger.error(f"结束会话统计失败: {e}") + logger.error(f"结束会话失败: {e}") + + @classmethod + def cleanup_old_logs(cls, max_size_mb: float = 50.0): + """ + 清理旧的HFC日志文件,保持目录大小在指定限制内 + + Args: + max_size_mb: 最大目录大小限制(MB) + """ + log_dir = Path("log/hfc_loop") + if not log_dir.exists(): + logger.info("HFC日志目录不存在,跳过日志清理") + return + + # 获取所有日志文件及其信息 + log_files = [] + total_size = 0 + + for log_file in log_dir.glob("*.json"): + try: + file_stat = log_file.stat() + log_files.append({ + 'path': log_file, + 'size': file_stat.st_size, + 'mtime': file_stat.st_mtime + }) + total_size += file_stat.st_size + except Exception as e: + logger.warning(f"无法获取文件信息 {log_file}: {e}") + + if not log_files: + logger.info("没有找到HFC日志文件") + return + + max_size_bytes = max_size_mb * 1024 * 1024 + current_size_mb = total_size / (1024 * 1024) + + logger.info(f"HFC日志目录当前大小: {current_size_mb:.2f}MB,限制: {max_size_mb}MB") + + if total_size <= max_size_bytes: + logger.info("HFC日志目录大小在限制范围内,无需清理") + return + + # 按修改时间排序(最早的在前面) + log_files.sort(key=lambda x: x['mtime']) + + deleted_count = 0 + deleted_size = 0 + + for file_info in log_files: + if total_size <= max_size_bytes: + break + + try: + file_size = file_info['size'] + file_path = file_info['path'] + + file_path.unlink() + total_size -= file_size + deleted_size += file_size + deleted_count += 1 + + logger.info(f"删除旧日志文件: {file_path.name} ({file_size / 1024:.1f}KB)") + + except Exception as e: + logger.error(f"删除日志文件失败 {file_info['path']}: {e}") + + final_size_mb = total_size / (1024 * 1024) + deleted_size_mb = deleted_size / (1024 * 1024) + + logger.info(f"HFC日志清理完成: 删除了{deleted_count}个文件,释放{deleted_size_mb:.2f}MB空间") + logger.info(f"清理后目录大小: {final_size_mb:.2f}MB") diff --git a/src/chat/utils/statistic.py b/src/chat/utils/statistic.py index 01d85b981..63715f5d7 100644 --- a/src/chat/utils/statistic.py +++ b/src/chat/utils/statistic.py @@ -3,6 +3,9 @@ from datetime import datetime, timedelta from typing import Any, Dict, Tuple, List import asyncio import concurrent.futures +import json +import os +import glob from src.common.logger import get_logger @@ -14,16 +17,6 @@ from src.manager.local_store_manager import local_storage logger = get_logger("maibot_statistic") -# HFC统计相关的键 -HFC_TOTAL_CYCLES = "hfc_total_cycles" -HFC_CYCLES_BY_CHAT = "hfc_cycles_by_chat" -HFC_CYCLES_BY_ACTION = "hfc_cycles_by_action" -HFC_CYCLES_BY_VERSION = "hfc_cycles_by_version" -HFC_AVG_TIME_BY_CHAT = "hfc_avg_time_by_chat" -HFC_AVG_TIME_BY_ACTION = "hfc_avg_time_by_action" -HFC_AVG_TIME_BY_VERSION = "hfc_avg_time_by_version" -HFC_ACTIONS_BY_CHAT = "hfc_actions_by_chat" # 群聊×动作交叉统计 - # 统计数据的键 TOTAL_REQ_CNT = "total_requests" TOTAL_COST = "total_cost" @@ -51,6 +44,20 @@ ONLINE_TIME = "online_time" TOTAL_MSG_CNT = "total_messages" MSG_CNT_BY_CHAT = "messages_by_chat" +# Focus统计数据的键 +FOCUS_TOTAL_CYCLES = "focus_total_cycles" +FOCUS_AVG_TIMES_BY_STAGE = "focus_avg_times_by_stage" +FOCUS_ACTION_RATIOS = "focus_action_ratios" +FOCUS_CYCLE_CNT_BY_CHAT = "focus_cycle_count_by_chat" +FOCUS_CYCLE_CNT_BY_ACTION = "focus_cycle_count_by_action" +FOCUS_AVG_TIMES_BY_CHAT_ACTION = "focus_avg_times_by_chat_action" +FOCUS_AVG_TIMES_BY_ACTION = "focus_avg_times_by_action" +FOCUS_TOTAL_TIME_BY_CHAT = "focus_total_time_by_chat" +FOCUS_TOTAL_TIME_BY_ACTION = "focus_total_time_by_action" +FOCUS_CYCLE_CNT_BY_VERSION = "focus_cycle_count_by_version" +FOCUS_ACTION_RATIOS_BY_VERSION = "focus_action_ratios_by_version" +FOCUS_AVG_TIMES_BY_VERSION = "focus_avg_times_by_version" + class OnlineTimeRecordTask(AsyncTask): """在线时间记录任务""" @@ -190,6 +197,8 @@ class StatisticOutputTask(AsyncTask): self._format_model_classified_stat(stats["last_hour"]), "", self._format_chat_stat(stats["last_hour"]), + "", + self._format_focus_stat(stats["last_hour"]), self.SEP_LINE, "", ] @@ -458,148 +467,187 @@ class StatisticOutputTask(AsyncTask): break return stats - def _collect_hfc_data_for_period(self, collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]: + def _collect_focus_statistics_for_period(self, collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]: """ - 收集指定时间段的HFC统计数据 + 收集指定时间段的Focus统计数据 :param collect_period: 统计时间段 """ if not collect_period: return {} - # 为每个时间段初始化空的统计数据 + collect_period.sort(key=lambda x: x[1], reverse=True) + stats = { period_key: { - HFC_TOTAL_CYCLES: 0, - HFC_CYCLES_BY_CHAT: defaultdict(int), - HFC_CYCLES_BY_ACTION: defaultdict(int), - HFC_CYCLES_BY_VERSION: defaultdict(int), - HFC_AVG_TIME_BY_CHAT: defaultdict(lambda: {"decision": 0, "action": 0, "total": 0}), - HFC_AVG_TIME_BY_ACTION: defaultdict(lambda: {"decision": 0, "action": 0, "total": 0}), - HFC_AVG_TIME_BY_VERSION: defaultdict(lambda: {"decision": 0, "action": 0, "total": 0}), - HFC_ACTIONS_BY_CHAT: defaultdict(lambda: defaultdict(int)), # 群聊×动作交叉统计 - + FOCUS_TOTAL_CYCLES: 0, + FOCUS_AVG_TIMES_BY_STAGE: defaultdict(list), + FOCUS_ACTION_RATIOS: defaultdict(int), + FOCUS_CYCLE_CNT_BY_CHAT: defaultdict(int), + FOCUS_CYCLE_CNT_BY_ACTION: defaultdict(int), + FOCUS_AVG_TIMES_BY_CHAT_ACTION: defaultdict(lambda: defaultdict(list)), + FOCUS_AVG_TIMES_BY_ACTION: defaultdict(lambda: defaultdict(list)), + "focus_exec_times_by_chat_action": defaultdict(lambda: defaultdict(list)), + FOCUS_TOTAL_TIME_BY_CHAT: defaultdict(float), + FOCUS_TOTAL_TIME_BY_ACTION: defaultdict(float), + FOCUS_CYCLE_CNT_BY_VERSION: defaultdict(int), + FOCUS_ACTION_RATIOS_BY_VERSION: defaultdict(lambda: defaultdict(int)), + FOCUS_AVG_TIMES_BY_VERSION: defaultdict(lambda: defaultdict(list)), + "focus_exec_times_by_version_action": defaultdict(lambda: defaultdict(list)), + "focus_action_ratios_by_chat": defaultdict(lambda: defaultdict(int)), } for period_key, _ in collect_period } - try: - import json - from pathlib import Path - - hfc_stats_file = Path("data/hfc/time.json") - if not hfc_stats_file.exists(): - logger.info("HFC统计文件不存在,跳过HFC统计") - return stats + # 获取 log/hfc_loop 目录下的所有 json 文件 + log_dir = "log/hfc_loop" + if not os.path.exists(log_dir): + logger.warning(f"Focus log directory {log_dir} does not exist") + return stats - # 读取HFC统计数据 - with open(hfc_stats_file, 'r', encoding='utf-8') as f: - hfc_data = json.load(f) + json_files = glob.glob(os.path.join(log_dir, "*.json")) + query_start_time = collect_period[-1][1] - # 处理每个chat_id和版本的统计数据 - for stats_key, chat_stats in hfc_data.items(): - chat_id = chat_stats.get("chat_id", "unknown") - version = chat_stats.get("version", "unknown") - last_updated_str = chat_stats.get("last_updated") - - if not last_updated_str: - continue - - # 解析最后更新时间 - try: - last_updated = datetime.fromisoformat(last_updated_str.replace('Z', '+00:00')) - if last_updated.tzinfo: - last_updated = last_updated.replace(tzinfo=None) - except: - continue - - # 对于"全部时间",所有数据都包含 - # 对于其他时间段,只包含在时间范围内更新的数据 - applicable_periods = [] - for period_key, period_start in collect_period: - if period_key == "all_time" or last_updated >= period_start: - applicable_periods.append(period_key) - - if not applicable_periods: - continue - - # 处理整体统计 - overall = chat_stats.get("overall", {}) - total_records = overall.get("total_records", 0) - avg_step_times = overall.get("avg_step_times", {}) - - # 计算决策时间和动作时间 - action_time = avg_step_times.get("执行动作", 0) - total_time = overall.get("avg_total_time", 0) - decision_time = max(0, total_time - action_time) - - for period_key in applicable_periods: - stats[period_key][HFC_TOTAL_CYCLES] += total_records - stats[period_key][HFC_CYCLES_BY_CHAT][chat_id] += total_records - stats[period_key][HFC_CYCLES_BY_VERSION][version] += total_records - - # 处理按动作类型的统计 - by_action = chat_stats.get("by_action", {}) - for action_type, action_data in by_action.items(): - count = action_data.get("count", 0) - action_step_times = action_data.get("avg_step_times", {}) - action_total_time = action_data.get("avg_total_time", 0) + for json_file in json_files: + try: + # 从文件名解析时间戳 (格式: hash_version_date_time.json) + filename = os.path.basename(json_file) + name_parts = filename.replace('.json', '').split('_') + if len(name_parts) >= 4: + date_str = name_parts[-2] # YYYYMMDD + time_str = name_parts[-1] # HHMMSS + file_time_str = f"{date_str}_{time_str}" + file_time = datetime.strptime(file_time_str, "%Y%m%d_%H%M%S") - # 计算该动作类型的决策时间和动作时间 - action_exec_time = action_step_times.get("执行动作", 0) - action_decision_time = max(0, action_total_time - action_exec_time) + # 如果文件时间在查询范围内,则处理该文件 + if file_time >= query_start_time: + with open(json_file, 'r', encoding='utf-8') as f: + cycles_data = json.load(f) + self._process_focus_file_data(cycles_data, stats, collect_period, file_time) + except Exception as e: + logger.warning(f"Failed to process focus file {json_file}: {e}") + continue - for period_key in applicable_periods: - stats[period_key][HFC_CYCLES_BY_ACTION][action_type] += count - - # 群聊×动作交叉统计 - stats[period_key][HFC_ACTIONS_BY_CHAT][chat_id][action_type] += count - - # 累加时间统计(用于后续计算加权平均) - # 这里我们需要重新设计数据结构来存储累计值 - if chat_id not in stats[period_key][HFC_AVG_TIME_BY_CHAT]: - stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id] = {"decision": 0, "action": 0, "total": 0, "count": 0} - if action_type not in stats[period_key][HFC_AVG_TIME_BY_ACTION]: - stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type] = {"decision": 0, "action": 0, "total": 0, "count": 0} - if version not in stats[period_key][HFC_AVG_TIME_BY_VERSION]: - stats[period_key][HFC_AVG_TIME_BY_VERSION][version] = {"decision": 0, "action": 0, "total": 0, "count": 0} - - # 累加加权值(时间*数量) - stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id]["decision"] += decision_time * total_records - stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id]["action"] += action_time * total_records - stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id]["total"] += total_time * total_records - stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id]["count"] += total_records - - stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type]["decision"] += action_decision_time * count - stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type]["action"] += action_exec_time * count - stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type]["total"] += action_total_time * count - stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type]["count"] += count - - stats[period_key][HFC_AVG_TIME_BY_VERSION][version]["decision"] += decision_time * total_records - stats[period_key][HFC_AVG_TIME_BY_VERSION][version]["action"] += action_time * total_records - stats[period_key][HFC_AVG_TIME_BY_VERSION][version]["total"] += total_time * total_records - stats[period_key][HFC_AVG_TIME_BY_VERSION][version]["count"] += total_records - - except Exception as e: - logger.error(f"收集HFC统计数据失败: {e}") - - # 只对非all_time时段计算加权平均时间,all_time需要在历史数据合并后再计算 - for period_key in stats: - if period_key != "all_time": # 跳过all_time,等历史数据合并后再计算 - for stat_type in [HFC_AVG_TIME_BY_CHAT, HFC_AVG_TIME_BY_ACTION, HFC_AVG_TIME_BY_VERSION]: - for key, time_data in stats[period_key][stat_type].items(): - if time_data.get("count", 0) > 0: - count = time_data["count"] - stats[period_key][stat_type][key] = { - "decision": time_data["decision"] / count, - "action": time_data["action"] / count, - "total": time_data["total"] / count - } - else: - stats[period_key][stat_type][key] = {"decision": 0, "action": 0, "total": 0} - + # 计算平均值 + self._calculate_focus_averages(stats) return stats + def _process_focus_file_data(self, cycles_data: List[Dict], stats: Dict[str, Any], + collect_period: List[Tuple[str, datetime]], file_time: datetime): + """ + 处理单个focus文件的数据 + """ + for cycle_data in cycles_data: + try: + # 解析时间戳 + timestamp_str = cycle_data.get("timestamp", "") + if timestamp_str: + cycle_time = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) + else: + cycle_time = file_time # 使用文件时间作为后备 + + chat_id = cycle_data.get("chat_id", "unknown") + action_type = cycle_data.get("action_type", "unknown") + total_time = cycle_data.get("total_time", 0.0) + step_times = cycle_data.get("step_times", {}) + version = cycle_data.get("version", "unknown") + + # 更新聊天ID名称映射 + if chat_id not in self.name_mapping: + # 尝试获取实际的聊天名称 + display_name = self._get_chat_display_name_from_id(chat_id) + self.name_mapping[chat_id] = (display_name, cycle_time.timestamp()) + + # 对每个时间段进行统计 + for idx, (_, period_start) in enumerate(collect_period): + if cycle_time >= period_start: + for period_key, _ in collect_period[idx:]: + stat = stats[period_key] + + # 基础统计 + stat[FOCUS_TOTAL_CYCLES] += 1 + stat[FOCUS_ACTION_RATIOS][action_type] += 1 + stat[FOCUS_CYCLE_CNT_BY_CHAT][chat_id] += 1 + stat[FOCUS_CYCLE_CNT_BY_ACTION][action_type] += 1 + stat["focus_action_ratios_by_chat"][chat_id][action_type] += 1 + stat[FOCUS_TOTAL_TIME_BY_CHAT][chat_id] += total_time + stat[FOCUS_TOTAL_TIME_BY_ACTION][action_type] += total_time + + # 版本统计 + stat[FOCUS_CYCLE_CNT_BY_VERSION][version] += 1 + stat[FOCUS_ACTION_RATIOS_BY_VERSION][version][action_type] += 1 + + # 阶段时间统计 + for stage, time_val in step_times.items(): + stat[FOCUS_AVG_TIMES_BY_STAGE][stage].append(time_val) + stat[FOCUS_AVG_TIMES_BY_CHAT_ACTION][chat_id][stage].append(time_val) + stat[FOCUS_AVG_TIMES_BY_ACTION][action_type][stage].append(time_val) + stat[FOCUS_AVG_TIMES_BY_VERSION][version][stage].append(time_val) + + # 专门收集执行动作阶段的时间,按聊天流和action类型分组 + if stage == "执行动作": + stat["focus_exec_times_by_chat_action"][chat_id][action_type].append(time_val) + # 按版本和action类型收集执行时间 + stat["focus_exec_times_by_version_action"][version][action_type].append(time_val) + break + except Exception as e: + logger.warning(f"Failed to process cycle data: {e}") + continue + + def _calculate_focus_averages(self, stats: Dict[str, Any]): + """ + 计算Focus统计的平均值 + """ + for period_key, stat in stats.items(): + # 计算全局阶段平均时间 + for stage, times in stat[FOCUS_AVG_TIMES_BY_STAGE].items(): + if times: + stat[FOCUS_AVG_TIMES_BY_STAGE][stage] = sum(times) / len(times) + else: + stat[FOCUS_AVG_TIMES_BY_STAGE][stage] = 0.0 + + # 计算按chat_id和action_type的阶段平均时间 + for chat_id, stage_times in stat[FOCUS_AVG_TIMES_BY_CHAT_ACTION].items(): + for stage, times in stage_times.items(): + if times: + stat[FOCUS_AVG_TIMES_BY_CHAT_ACTION][chat_id][stage] = sum(times) / len(times) + else: + stat[FOCUS_AVG_TIMES_BY_CHAT_ACTION][chat_id][stage] = 0.0 + + # 计算按action_type的阶段平均时间 + for action_type, stage_times in stat[FOCUS_AVG_TIMES_BY_ACTION].items(): + for stage, times in stage_times.items(): + if times: + stat[FOCUS_AVG_TIMES_BY_ACTION][action_type][stage] = sum(times) / len(times) + else: + stat[FOCUS_AVG_TIMES_BY_ACTION][action_type][stage] = 0.0 + + # 计算按聊天流和action类型的执行时间平均值 + for chat_id, action_times in stat["focus_exec_times_by_chat_action"].items(): + for action_type, times in action_times.items(): + if times: + stat["focus_exec_times_by_chat_action"][chat_id][action_type] = sum(times) / len(times) + else: + stat["focus_exec_times_by_chat_action"][chat_id][action_type] = 0.0 + + # 计算按版本的阶段平均时间 + for version, stage_times in stat[FOCUS_AVG_TIMES_BY_VERSION].items(): + for stage, times in stage_times.items(): + if times: + stat[FOCUS_AVG_TIMES_BY_VERSION][version][stage] = sum(times) / len(times) + else: + stat[FOCUS_AVG_TIMES_BY_VERSION][version][stage] = 0.0 + + # 计算按版本和action类型的执行时间平均值 + for version, action_times in stat["focus_exec_times_by_version_action"].items(): + for action_type, times in action_times.items(): + if times: + stat["focus_exec_times_by_version_action"][version][action_type] = sum(times) / len(times) + else: + stat["focus_exec_times_by_version_action"][version][action_type] = 0.0 + + + def _collect_all_statistics(self, now: datetime) -> Dict[str, Dict[str, Any]]: """ 收集各时间段的统计数据 @@ -625,9 +673,7 @@ class StatisticOutputTask(AsyncTask): model_req_stat = self._collect_model_request_for_period(stat_start_timestamp) online_time_stat = self._collect_online_time_for_period(stat_start_timestamp, now) message_count_stat = self._collect_message_count_for_period(stat_start_timestamp) - - # HFC统计数据收集 - hfc_stat = self._collect_hfc_data_for_period(stat_start_timestamp) + focus_stat = self._collect_focus_statistics_for_period(stat_start_timestamp) # 统计数据合并 # 合并四类统计数据 @@ -635,15 +681,11 @@ class StatisticOutputTask(AsyncTask): stat[period_key].update(model_req_stat[period_key]) stat[period_key].update(online_time_stat[period_key]) stat[period_key].update(message_count_stat[period_key]) - stat[period_key].update(hfc_stat[period_key]) + stat[period_key].update(focus_stat[period_key]) if last_all_time_stat: # 若存在上次完整统计数据,则将其与当前统计数据合并 for key, val in last_all_time_stat.items(): - # 跳过已删除的SUCCESS_RATE相关key - if key in ["hfc_success_rate_by_chat", "hfc_success_rate_by_action", "hfc_success_rate_by_version"]: - continue - # 确保当前统计数据中存在该key if key not in stat["all_time"]: continue @@ -651,69 +693,54 @@ class StatisticOutputTask(AsyncTask): if isinstance(val, dict): # 是字典类型,则进行合并 for sub_key, sub_val in val.items(): - # 检查是否是HFC的嵌套字典时间数据 - if key in [HFC_AVG_TIME_BY_CHAT, HFC_AVG_TIME_BY_ACTION, HFC_AVG_TIME_BY_VERSION] and isinstance(sub_val, dict): - # 对于HFC时间数据,需要特殊处理 - if sub_key not in stat["all_time"][key]: - stat["all_time"][key][sub_key] = {"decision": 0, "action": 0, "total": 0, "count": 0} - - # 如果历史数据是已经计算过的平均值(没有count字段),需要跳过或重新处理 - if "count" not in sub_val: - logger.debug(f"历史数据{key}.{sub_key}是平均值格式,跳过合并以避免错误计算") - continue - - # 合并累计的加权时间数据 - for time_type, time_val in sub_val.items(): - if time_type in stat["all_time"][key][sub_key]: - stat["all_time"][key][sub_key][time_type] += time_val - elif key == HFC_ACTIONS_BY_CHAT and isinstance(sub_val, dict): - # 对于群聊×动作交叉统计的二层嵌套字典,需要特殊处理 - if sub_key not in stat["all_time"][key]: - stat["all_time"][key][sub_key] = {} - - # 合并二层嵌套的动作数据 - for action_type, action_count in sub_val.items(): - if action_type in stat["all_time"][key][sub_key]: - stat["all_time"][key][sub_key][action_type] += action_count - else: - stat["all_time"][key][sub_key][action_type] = action_count - else: - # 普通的数值或字典合并 - if sub_key in stat["all_time"][key]: - stat["all_time"][key][sub_key] += sub_val + # 普通的数值或字典合并 + if sub_key in stat["all_time"][key]: + # 检查是否为嵌套的字典类型(如版本统计) + if isinstance(sub_val, dict) and isinstance(stat["all_time"][key][sub_key], dict): + # 合并嵌套字典 + for nested_key, nested_val in sub_val.items(): + if nested_key in stat["all_time"][key][sub_key]: + stat["all_time"][key][sub_key][nested_key] += nested_val + else: + stat["all_time"][key][sub_key][nested_key] = nested_val else: - stat["all_time"][key][sub_key] = sub_val + # 普通数值累加 + stat["all_time"][key][sub_key] += sub_val + else: + stat["all_time"][key][sub_key] = sub_val else: # 直接合并 stat["all_time"][key] += val - # 为all_time计算正确的平均时间(在历史数据合并后) - if "all_time" in stat: - for stat_type in [HFC_AVG_TIME_BY_CHAT, HFC_AVG_TIME_BY_ACTION, HFC_AVG_TIME_BY_VERSION]: - if stat_type in stat["all_time"]: - for key, time_data in stat["all_time"][stat_type].items(): - if time_data.get("count", 0) > 0: - count = time_data["count"] - # 计算平均值,但保留count字段用于下次合并 - avg_data = { - "decision": time_data["decision"] / count, - "action": time_data["action"] / count, - "total": time_data["total"] / count, - "count": count # 保留count字段 - } - stat["all_time"][stat_type][key] = avg_data - else: - stat["all_time"][stat_type][key] = {"decision": 0, "action": 0, "total": 0, "count": 0} - # 更新上次完整统计数据的时间戳 + # 将所有defaultdict转换为普通dict以避免类型冲突 + clean_stat_data = self._convert_defaultdict_to_dict(stat["all_time"]) local_storage["last_full_statistics"] = { "name_mapping": self.name_mapping, - "stat_data": stat["all_time"], + "stat_data": clean_stat_data, "timestamp": now.timestamp(), } return stat + def _convert_defaultdict_to_dict(self, data): + """递归转换defaultdict为普通dict""" + if isinstance(data, defaultdict): + # 转换defaultdict为普通dict + result = {} + for key, value in data.items(): + result[key] = self._convert_defaultdict_to_dict(value) + return result + elif isinstance(data, dict): + # 递归处理普通dict + result = {} + for key, value in data.items(): + result[key] = self._convert_defaultdict_to_dict(value) + return result + else: + # 其他类型直接返回 + return data + # -- 以下为统计数据格式化方法 -- @staticmethod @@ -770,6 +797,75 @@ class StatisticOutputTask(AsyncTask): output.append("") return "\n".join(output) + def _format_focus_stat(self, stats: Dict[str, Any]) -> str: + """ + 格式化Focus统计数据 + """ + if stats[FOCUS_TOTAL_CYCLES] <= 0: + return "" + + output = [ + "Focus系统统计:", + f"总循环数: {stats[FOCUS_TOTAL_CYCLES]}", + "" + ] + + # 全局阶段平均时间 + if stats[FOCUS_AVG_TIMES_BY_STAGE]: + output.append("全局阶段平均时间:") + for stage, avg_time in stats[FOCUS_AVG_TIMES_BY_STAGE].items(): + output.append(f" {stage}: {avg_time:.3f}秒") + output.append("") + + # Action类型比例 + if stats[FOCUS_ACTION_RATIOS]: + total_actions = sum(stats[FOCUS_ACTION_RATIOS].values()) + output.append("Action类型分布:") + for action_type, count in sorted(stats[FOCUS_ACTION_RATIOS].items()): + ratio = (count / total_actions) * 100 if total_actions > 0 else 0 + output.append(f" {action_type}: {count} ({ratio:.1f}%)") + output.append("") + + # 按Chat统计(仅显示前10个) + if stats[FOCUS_CYCLE_CNT_BY_CHAT]: + output.append("按聊天流统计 (前10):") + sorted_chats = sorted(stats[FOCUS_CYCLE_CNT_BY_CHAT].items(), key=lambda x: x[1], reverse=True)[:10] + for chat_id, count in sorted_chats: + chat_name = self.name_mapping.get(chat_id, (chat_id, 0))[0] + output.append(f" {chat_name[:30]}: {count} 循环") + output.append("") + + return "\n".join(output) + + def _get_chat_display_name_from_id(self, chat_id: str) -> str: + """从chat_id获取显示名称""" + try: + # 首先尝试从chat_stream获取真实群组名称 + from src.chat.message_receive.chat_stream import get_chat_manager + chat_manager = get_chat_manager() + + if chat_id in chat_manager.streams: + stream = chat_manager.streams[chat_id] + if stream.group_info and hasattr(stream.group_info, 'group_name'): + group_name = stream.group_info.group_name + if group_name and group_name.strip(): + return group_name.strip() + elif stream.user_info and hasattr(stream.user_info, 'user_nickname'): + user_name = stream.user_info.user_nickname + if user_name and user_name.strip(): + return user_name.strip() + + # 如果从chat_stream获取失败,尝试解析chat_id格式 + if chat_id.startswith('g'): + return f"群聊{chat_id[1:]}" + elif chat_id.startswith('u'): + return f"用户{chat_id[1:]}" + else: + return chat_id + except Exception as e: + logger.warning(f"获取聊天显示名称失败: {e}") + return chat_id + def _generate_html_report(self, stat: dict[str, Any], now: datetime): """ 生成HTML格式的统计报告 @@ -782,10 +878,10 @@ class StatisticOutputTask(AsyncTask): f'' for period in self.stat_period ] - # 添加图表选项卡 + # 添加Focus统计、版本对比和图表选项卡 + tab_list.append('') + tab_list.append('') tab_list.append('') - # 添加HFC统计选项卡 - tab_list.append('') def _format_stat_data(stat_data: dict[str, Any], div_id: str, start_time: datetime) -> str: """ @@ -846,6 +942,43 @@ class StatisticOutputTask(AsyncTask): for chat_id, count in sorted(stat_data[MSG_CNT_BY_CHAT].items()) ] ) + + # Focus统计数据 + focus_action_rows = "" + focus_chat_rows = "" + focus_stage_rows = "" + focus_action_stage_rows = "" + + if stat_data.get(FOCUS_TOTAL_CYCLES, 0) > 0: + # Action类型统计 + total_actions = sum(stat_data[FOCUS_ACTION_RATIOS].values()) if stat_data[FOCUS_ACTION_RATIOS] else 0 + focus_action_rows = "\n".join([ + f"
统计时段: {time_range}
+总循环数: {stat_data.get(FOCUS_TOTAL_CYCLES, 0)}
+ +| 阶段 | 平均时间 |
|---|
| Action类型 | 次数 | 占比 |
|---|
在指定时间段内未找到任何Focus循环数据。
+请确保 log/hfc_loop/ 目录下存在相应的JSON文件。
+ 数据来源: log/hfc_loop/ 目录下的JSON文件
+ 统计内容: 各时间段的Focus循环性能分析
+
统计时段: {time_range}
+包含版本: {len(all_versions)} 个版本
+ +在指定时间段内未找到任何版本信息。
+请确保 log/hfc_loop/ 目录下的JSON文件包含版本信息。
+ 对比内容: 不同版本的Action类型分布和各阶段性能表现
+ 数据来源: log/hfc_loop/ 目录下JSON文件中的version字段
+
暂无HFC数据
" - - def _generate_chat_action_table(actions_by_chat): - """生成群聊×动作选择率表格""" - if not actions_by_chat: - return "暂无数据
" - - # 获取所有动作类型 - all_actions = set() - for chat_actions in actions_by_chat.values(): - all_actions.update(chat_actions.keys()) - - if not all_actions: - return "暂无数据
" - - all_actions = sorted(all_actions) - - # 生成表头 - action_headers = "" - for action in all_actions: - action_display = action - if action == "no_reply": - action_display = "不回复" - action_headers += f"| 群聊名称 | {action_headers}总计 |
|---|
说明:显示每个群聊中不同动作类型的选择次数及占比。
- """ - - cycles_by_chat = data.get(HFC_CYCLES_BY_CHAT, {}) - cycles_by_action = data.get(HFC_CYCLES_BY_ACTION, {}) - cycles_by_version = data.get(HFC_CYCLES_BY_VERSION, {}) - avg_time_by_chat = data.get(HFC_AVG_TIME_BY_CHAT, {}) - avg_time_by_action = data.get(HFC_AVG_TIME_BY_ACTION, {}) - avg_time_by_version = data.get(HFC_AVG_TIME_BY_VERSION, {}) - actions_by_chat = data.get(HFC_ACTIONS_BY_CHAT, {}) - - # 按群聊统计表格 - chat_rows = "" - for chat_id in sorted(cycles_by_chat.keys()): - cycles = cycles_by_chat[chat_id] - time_data = avg_time_by_chat.get(chat_id, {"decision": 0, "action": 0, "total": 0}) - decision_time = time_data.get("decision", 0) - action_time = time_data.get("action", 0) - total_time = time_data.get("total", 0) - chat_display_name = _get_chat_display_name(chat_id) - chat_rows += f""" -| 群聊名称 | 循环次数 | 决策时间 | 动作时间 | 总时间 |
|---|
| 动作类型 | 循环次数 | 决策时间 | 动作时间 | 总时间 |
|---|
时间说明:决策时间包括观察、处理、规划等步骤;动作时间是执行具体动作的时间。
- -| 版本 | 循环次数 | 决策时间 | 动作时间 | 总时间 |
|---|
系统中还没有HFC循环记录
") - - sections_html = "说明:此页面显示HFC模块的性能统计信息,包括各群聊、动作类型和版本的详细数据。
- - {sections_html} -