feat:提供focus统计,查看你的no_reply吧!!

This commit is contained in:
SengokuCola
2025-06-22 18:15:27 +08:00
parent 396cb9822b
commit c49f995c21

View File

@@ -14,6 +14,16 @@ from src.manager.local_store_manager import local_storage
logger = get_logger("maibot_statistic")
# HFC统计相关的键
HFC_TOTAL_CYCLES = "hfc_total_cycles"
HFC_CYCLES_BY_CHAT = "hfc_cycles_by_chat"
HFC_CYCLES_BY_ACTION = "hfc_cycles_by_action"
HFC_CYCLES_BY_VERSION = "hfc_cycles_by_version"
HFC_AVG_TIME_BY_CHAT = "hfc_avg_time_by_chat"
HFC_AVG_TIME_BY_ACTION = "hfc_avg_time_by_action"
HFC_AVG_TIME_BY_VERSION = "hfc_avg_time_by_version"
HFC_ACTIONS_BY_CHAT = "hfc_actions_by_chat" # 群聊×动作交叉统计
# 统计数据的键
TOTAL_REQ_CNT = "total_requests"
TOTAL_COST = "total_cost"
@@ -457,6 +467,147 @@ class StatisticOutputTask(AsyncTask):
break
return stats
def _collect_hfc_data_for_period(self, collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]:
"""
收集指定时间段的HFC统计数据
:param collect_period: 统计时间段
"""
if not collect_period:
return {}
# 为每个时间段初始化空的统计数据
stats = {
period_key: {
HFC_TOTAL_CYCLES: 0,
HFC_CYCLES_BY_CHAT: defaultdict(int),
HFC_CYCLES_BY_ACTION: defaultdict(int),
HFC_CYCLES_BY_VERSION: defaultdict(int),
HFC_AVG_TIME_BY_CHAT: defaultdict(lambda: {"decision": 0, "action": 0, "total": 0}),
HFC_AVG_TIME_BY_ACTION: defaultdict(lambda: {"decision": 0, "action": 0, "total": 0}),
HFC_AVG_TIME_BY_VERSION: defaultdict(lambda: {"decision": 0, "action": 0, "total": 0}),
HFC_ACTIONS_BY_CHAT: defaultdict(lambda: defaultdict(int)), # 群聊×动作交叉统计
}
for period_key, _ in collect_period
}
try:
import json
from pathlib import Path
hfc_stats_file = Path("data/hfc/time.json")
if not hfc_stats_file.exists():
logger.info("HFC统计文件不存在跳过HFC统计")
return stats
# 读取HFC统计数据
with open(hfc_stats_file, 'r', encoding='utf-8') as f:
hfc_data = json.load(f)
# 处理每个chat_id和版本的统计数据
for stats_key, chat_stats in hfc_data.items():
chat_id = chat_stats.get("chat_id", "unknown")
version = chat_stats.get("version", "unknown")
last_updated_str = chat_stats.get("last_updated")
if not last_updated_str:
continue
# 解析最后更新时间
try:
last_updated = datetime.fromisoformat(last_updated_str.replace('Z', '+00:00'))
if last_updated.tzinfo:
last_updated = last_updated.replace(tzinfo=None)
except:
continue
# 对于"全部时间",所有数据都包含
# 对于其他时间段,只包含在时间范围内更新的数据
applicable_periods = []
for period_key, period_start in collect_period:
if period_key == "all_time" or last_updated >= period_start:
applicable_periods.append(period_key)
if not applicable_periods:
continue
# 处理整体统计
overall = chat_stats.get("overall", {})
total_records = overall.get("total_records", 0)
avg_step_times = overall.get("avg_step_times", {})
# 计算决策时间和动作时间
action_time = avg_step_times.get("执行动作", 0)
total_time = overall.get("avg_total_time", 0)
decision_time = max(0, total_time - action_time)
for period_key in applicable_periods:
stats[period_key][HFC_TOTAL_CYCLES] += total_records
stats[period_key][HFC_CYCLES_BY_CHAT][chat_id] += total_records
stats[period_key][HFC_CYCLES_BY_VERSION][version] += total_records
# 处理按动作类型的统计
by_action = chat_stats.get("by_action", {})
for action_type, action_data in by_action.items():
count = action_data.get("count", 0)
action_step_times = action_data.get("avg_step_times", {})
action_total_time = action_data.get("avg_total_time", 0)
# 计算该动作类型的决策时间和动作时间
action_exec_time = action_step_times.get("执行动作", 0)
action_decision_time = max(0, action_total_time - action_exec_time)
for period_key in applicable_periods:
stats[period_key][HFC_CYCLES_BY_ACTION][action_type] += count
# 群聊×动作交叉统计
stats[period_key][HFC_ACTIONS_BY_CHAT][chat_id][action_type] += count
# 累加时间统计(用于后续计算加权平均)
# 这里我们需要重新设计数据结构来存储累计值
if chat_id not in stats[period_key][HFC_AVG_TIME_BY_CHAT]:
stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id] = {"decision": 0, "action": 0, "total": 0, "count": 0}
if action_type not in stats[period_key][HFC_AVG_TIME_BY_ACTION]:
stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type] = {"decision": 0, "action": 0, "total": 0, "count": 0}
if version not in stats[period_key][HFC_AVG_TIME_BY_VERSION]:
stats[period_key][HFC_AVG_TIME_BY_VERSION][version] = {"decision": 0, "action": 0, "total": 0, "count": 0}
# 累加加权值(时间*数量)
stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id]["decision"] += decision_time * total_records
stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id]["action"] += action_time * total_records
stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id]["total"] += total_time * total_records
stats[period_key][HFC_AVG_TIME_BY_CHAT][chat_id]["count"] += total_records
stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type]["decision"] += action_decision_time * count
stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type]["action"] += action_exec_time * count
stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type]["total"] += action_total_time * count
stats[period_key][HFC_AVG_TIME_BY_ACTION][action_type]["count"] += count
stats[period_key][HFC_AVG_TIME_BY_VERSION][version]["decision"] += decision_time * total_records
stats[period_key][HFC_AVG_TIME_BY_VERSION][version]["action"] += action_time * total_records
stats[period_key][HFC_AVG_TIME_BY_VERSION][version]["total"] += total_time * total_records
stats[period_key][HFC_AVG_TIME_BY_VERSION][version]["count"] += total_records
except Exception as e:
logger.error(f"收集HFC统计数据失败: {e}")
# 计算加权平均时间
for period_key in stats:
for stat_type in [HFC_AVG_TIME_BY_CHAT, HFC_AVG_TIME_BY_ACTION, HFC_AVG_TIME_BY_VERSION]:
for key, time_data in stats[period_key][stat_type].items():
if time_data.get("count", 0) > 0:
count = time_data["count"]
stats[period_key][stat_type][key] = {
"decision": time_data["decision"] / count,
"action": time_data["action"] / count,
"total": time_data["total"] / count
}
else:
stats[period_key][stat_type][key] = {"decision": 0, "action": 0, "total": 0}
return stats
def _collect_all_statistics(self, now: datetime) -> Dict[str, Dict[str, Any]]:
"""
收集各时间段的统计数据
@@ -483,20 +634,58 @@ class StatisticOutputTask(AsyncTask):
online_time_stat = self._collect_online_time_for_period(stat_start_timestamp, now)
message_count_stat = self._collect_message_count_for_period(stat_start_timestamp)
# HFC统计数据收集
hfc_stat = self._collect_hfc_data_for_period(stat_start_timestamp)
# 统计数据合并
# 合并类统计数据
# 合并类统计数据
for period_key, _ in stat_start_timestamp:
stat[period_key].update(model_req_stat[period_key])
stat[period_key].update(online_time_stat[period_key])
stat[period_key].update(message_count_stat[period_key])
stat[period_key].update(hfc_stat[period_key])
if last_all_time_stat:
# 若存在上次完整统计数据,则将其与当前统计数据合并
for key, val in last_all_time_stat.items():
# 跳过已删除的SUCCESS_RATE相关key
if key in ["hfc_success_rate_by_chat", "hfc_success_rate_by_action", "hfc_success_rate_by_version"]:
continue
# 确保当前统计数据中存在该key
if key not in stat["all_time"]:
continue
if isinstance(val, dict):
# 是字典类型,则进行合并
for sub_key, sub_val in val.items():
stat["all_time"][key][sub_key] += sub_val
# 检查是否是HFC的嵌套字典时间数据
if key in [HFC_AVG_TIME_BY_CHAT, HFC_AVG_TIME_BY_ACTION, HFC_AVG_TIME_BY_VERSION] and isinstance(sub_val, dict):
# 对于HFC时间数据需要特殊处理
if sub_key not in stat["all_time"][key]:
stat["all_time"][key][sub_key] = {"decision": 0, "action": 0, "total": 0}
# 合并嵌套的时间数据
for time_type, time_val in sub_val.items():
if time_type in stat["all_time"][key][sub_key]:
stat["all_time"][key][sub_key][time_type] += time_val
elif key == HFC_ACTIONS_BY_CHAT and isinstance(sub_val, dict):
# 对于群聊×动作交叉统计的二层嵌套字典,需要特殊处理
if sub_key not in stat["all_time"][key]:
stat["all_time"][key][sub_key] = {}
# 合并二层嵌套的动作数据
for action_type, action_count in sub_val.items():
if action_type in stat["all_time"][key][sub_key]:
stat["all_time"][key][sub_key][action_type] += action_count
else:
stat["all_time"][key][sub_key][action_type] = action_count
else:
# 普通的数值或字典合并
if sub_key in stat["all_time"][key]:
stat["all_time"][key][sub_key] += sub_val
else:
stat["all_time"][key][sub_key] = sub_val
else:
# 直接合并
stat["all_time"][key] += val
@@ -580,6 +769,8 @@ class StatisticOutputTask(AsyncTask):
]
# 添加图表选项卡
tab_list.append('<button class="tab-link" onclick="showTab(event, \'charts\')">数据图表</button>')
# 添加HFC统计选项卡
tab_list.append('<button class="tab-link" onclick="showTab(event, \'hfc_stats\')">HFC统计</button>')
def _format_stat_data(stat_data: dict[str, Any], div_id: str, start_time: datetime) -> str:
"""
@@ -705,6 +896,9 @@ class StatisticOutputTask(AsyncTask):
# 添加图表内容
chart_data = self._generate_chart_data(stat)
tab_content_list.append(self._generate_chart_tab(chart_data))
# 添加HFC统计内容
tab_content_list.append(self._generate_hfc_stats_tab(stat))
joined_tab_list = "\n".join(tab_list)
joined_tab_content = "\n".join(tab_content_list)
@@ -953,6 +1147,208 @@ class StatisticOutputTask(AsyncTask):
"message_by_chat": message_by_chat,
}
def _generate_hfc_stats_tab(self, stat: dict[str, Any]) -> str:
"""生成HFC统计选项卡HTML内容"""
def _get_chat_display_name(chat_id):
"""获取聊天显示名称"""
if chat_id in self.name_mapping:
return self.name_mapping[chat_id][0]
else:
return chat_id
def _generate_overview_section(data, title):
"""生成总览部分"""
total_cycles = data.get(HFC_TOTAL_CYCLES, 0)
if total_cycles == 0:
return f"<h3>{title}</h3><p>暂无HFC数据</p>"
def _generate_chat_action_table(actions_by_chat):
"""生成群聊×动作选择率表格"""
if not actions_by_chat:
return "<h4>按群聊的动作选择率</h4><p>暂无数据</p>"
# 获取所有动作类型
all_actions = set()
for chat_actions in actions_by_chat.values():
all_actions.update(chat_actions.keys())
if not all_actions:
return "<h4>按群聊的动作选择率</h4><p>暂无数据</p>"
all_actions = sorted(all_actions)
# 生成表头
action_headers = ""
for action in all_actions:
action_display = action
if action == "no_reply":
action_display = "不回复"
action_headers += f"<th>{action_display}</th>"
# 生成表格行
table_rows = ""
for chat_id in sorted(actions_by_chat.keys()):
chat_actions = actions_by_chat[chat_id]
chat_total = sum(chat_actions.values())
if chat_total == 0:
continue
chat_display_name = _get_chat_display_name(chat_id)
table_rows += f"<tr><td>{chat_display_name}</td>"
# 为每个动作生成百分比
for action in all_actions:
count = chat_actions.get(action, 0)
percentage = (count / chat_total * 100) if chat_total > 0 else 0
table_rows += f"<td>{count} ({percentage:.1f}%)</td>"
table_rows += f"<td>{chat_total}</td></tr>"
return f"""
<h4>按群聊的动作选择率</h4>
<table>
<thead>
<tr><th>群聊名称</th>{action_headers}<th>总计</th></tr>
</thead>
<tbody>
{table_rows}
</tbody>
</table>
<p class="info-item"><strong>说明:</strong>显示每个群聊中不同动作类型的选择次数及占比。</p>
"""
cycles_by_chat = data.get(HFC_CYCLES_BY_CHAT, {})
cycles_by_action = data.get(HFC_CYCLES_BY_ACTION, {})
cycles_by_version = data.get(HFC_CYCLES_BY_VERSION, {})
avg_time_by_chat = data.get(HFC_AVG_TIME_BY_CHAT, {})
avg_time_by_action = data.get(HFC_AVG_TIME_BY_ACTION, {})
avg_time_by_version = data.get(HFC_AVG_TIME_BY_VERSION, {})
actions_by_chat = data.get(HFC_ACTIONS_BY_CHAT, {})
# 按群聊统计表格
chat_rows = ""
for chat_id in sorted(cycles_by_chat.keys()):
cycles = cycles_by_chat[chat_id]
time_data = avg_time_by_chat.get(chat_id, {"decision": 0, "action": 0, "total": 0})
decision_time = time_data.get("decision", 0)
action_time = time_data.get("action", 0)
total_time = time_data.get("total", 0)
chat_display_name = _get_chat_display_name(chat_id)
chat_rows += f"""
<tr>
<td>{chat_display_name}</td>
<td>{cycles}</td>
<td>{decision_time:.2f}s</td>
<td>{action_time:.2f}s</td>
<td>{total_time:.2f}s</td>
</tr>"""
# 按动作类型统计表格 - 添加说明
action_rows = ""
for action_type in sorted(cycles_by_action.keys()):
cycles = cycles_by_action[action_type]
time_data = avg_time_by_action.get(action_type, {"decision": 0, "action": 0, "total": 0})
decision_time = time_data.get("decision", 0)
action_time = time_data.get("action", 0)
total_time = time_data.get("total", 0)
# 为no_reply添加说明
action_display = action_type
if action_type == "no_reply":
action_display = f"{action_type} (不回复决策)"
action_rows += f"""
<tr>
<td>{action_display}</td>
<td>{cycles}</td>
<td>{decision_time:.2f}s</td>
<td>{action_time:.2f}s</td>
<td>{total_time:.2f}s</td>
</tr>"""
# 按版本统计表格
version_rows = ""
for version in sorted(cycles_by_version.keys()):
cycles = cycles_by_version[version]
time_data = avg_time_by_version.get(version, {"decision": 0, "action": 0, "total": 0})
decision_time = time_data.get("decision", 0)
action_time = time_data.get("action", 0)
total_time = time_data.get("total", 0)
version_rows += f"""
<tr>
<td>{version}</td>
<td>{cycles}</td>
<td>{decision_time:.2f}s</td>
<td>{action_time:.2f}s</td>
<td>{total_time:.2f}s</td>
</tr>"""
return f"""
<h3>{title} (总循环数: {total_cycles})</h3>
<h4>按群聊统计</h4>
<table>
<thead>
<tr><th>群聊名称</th><th>循环次数</th><th>决策时间</th><th>动作时间</th><th>总时间</th></tr>
</thead>
<tbody>
{chat_rows}
</tbody>
</table>
<h4>按动作类型统计</h4>
<table>
<thead>
<tr><th>动作类型</th><th>循环次数</th><th>决策时间</th><th>动作时间</th><th>总时间</th></tr>
</thead>
<tbody>
{action_rows}
</tbody>
</table>
<p class="info-item"><strong>时间说明:</strong>决策时间包括观察、处理、规划等步骤;动作时间是执行具体动作的时间。</p>
<h4>按版本统计</h4>
<table>
<thead>
<tr><th>版本</th><th>循环次数</th><th>决策时间</th><th>动作时间</th><th>总时间</th></tr>
</thead>
<tbody>
{version_rows}
</tbody>
</table>
{_generate_chat_action_table(actions_by_chat)}
"""
# 生成指定时间段的统计
sections = []
# 定义要显示的时间段及其描述(所有时间在最上方)
time_periods = [
("all_time", "全部时间"),
("last_24_hours", "最近24小时"),
("last_7_days", "最近7天")
]
for period_key, period_desc in time_periods:
period_data = stat.get(period_key, {})
if period_data.get(HFC_TOTAL_CYCLES, 0) > 0: # 只显示有数据的时间段
sections.append(_generate_overview_section(period_data, period_desc))
if not sections:
sections.append("<h3>暂无HFC数据</h3><p>系统中还没有HFC循环记录</p>")
sections_html = "<br/>".join(sections)
return f"""
<div id="hfc_stats" class="tab-content">
<h2>HFC (Heart Flow Chat) 统计</h2>
<p class="info-item"><strong>说明:</strong>此页面显示HFC模块的性能统计信息包括各群聊、动作类型和版本的详细数据。</p>
{sections_html}
</div>
"""
def _generate_chart_tab(self, chart_data: dict) -> str:
"""生成图表选项卡HTML内容"""
@@ -1289,6 +1685,9 @@ class AsyncStatisticOutputTask(AsyncTask):
def _collect_message_count_for_period(self, collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]:
return StatisticOutputTask._collect_message_count_for_period(self, collect_period)
def _collect_hfc_data_for_period(self, collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]:
return StatisticOutputTask._collect_hfc_data_for_period(self, collect_period)
@staticmethod
def _format_total_stat(stats: Dict[str, Any]) -> str:
@@ -1309,3 +1708,6 @@ class AsyncStatisticOutputTask(AsyncTask):
def _generate_chart_tab(self, chart_data: dict) -> str:
return StatisticOutputTask._generate_chart_tab(self, chart_data)
def _generate_hfc_stats_tab(self, stat: dict[str, Any]) -> str:
return StatisticOutputTask._generate_hfc_stats_tab(self, stat)