先杀一部分根目录文件

This commit is contained in:
墨梓柒
2025-04-28 22:21:37 +08:00
parent 200c64646e
commit c3e0d6651c
4 changed files with 1 additions and 1 deletions

View File

@@ -0,0 +1,569 @@
import tkinter as tk
from tkinter import ttk
import time
import os
from datetime import datetime, timedelta
import random
from collections import deque
import json # 引入 json
# --- 引入 Matplotlib ---
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.dates as mdates # 用于处理日期格式
import matplotlib # 导入 matplotlib
# --- 配置 ---
LOG_FILE_PATH = os.path.join("logs", "interest", "interest_history.log") # 指向历史日志文件
REFRESH_INTERVAL_MS = 200 # 刷新间隔 (毫秒) - 可以适当调长,因为读取文件可能耗时
WINDOW_TITLE = "Interest Monitor (Live History)"
MAX_HISTORY_POINTS = 1000 # 图表上显示的最大历史点数 (可以增加)
MAX_STREAMS_TO_DISPLAY = 15 # 最多显示多少个聊天流的折线图 (可以增加)
# *** 添加 Matplotlib 中文字体配置 ***
# 尝试使用 'SimHei' 或 'Microsoft YaHei'如果找不到matplotlib 会回退到默认字体
# 确保你的系统上安装了这些字体
matplotlib.rcParams["font.sans-serif"] = ["SimHei", "Microsoft YaHei"]
matplotlib.rcParams["axes.unicode_minus"] = False # 解决负号'-'显示为方块的问题
class InterestMonitorApp:
def __init__(self, root):
self.root = root
self.root.title(WINDOW_TITLE)
self.root.geometry("1800x800") # 调整窗口大小以适应图表
# --- 数据存储 ---
# 使用 deque 来存储有限的历史数据点
# key: stream_id, value: deque([(timestamp, interest_level), ...])
self.stream_history = {}
# key: stream_id, value: deque([(timestamp, reply_probability), ...])
self.probability_history = {}
self.stream_colors = {} # 为每个 stream 分配颜色
self.stream_display_names = {} # 存储显示名称 (group_name)
self.selected_stream_id = tk.StringVar() # 用于 Combobox 绑定
# --- 新增:存储其他参数 ---
# 顶层信息
self.latest_main_mind = tk.StringVar(value="N/A")
self.latest_mai_state = tk.StringVar(value="N/A")
self.latest_subflow_count = tk.IntVar(value=0)
# 子流最新状态 (key: stream_id)
self.stream_sub_minds = {}
self.stream_chat_states = {}
self.stream_threshold_status = {}
self.stream_last_active = {}
self.stream_last_interaction = {}
# 用于显示单个流详情的 StringVar
self.single_stream_sub_mind = tk.StringVar(value="想法: N/A")
self.single_stream_chat_state = tk.StringVar(value="状态: N/A")
self.single_stream_threshold = tk.StringVar(value="阈值: N/A")
self.single_stream_last_active = tk.StringVar(value="活跃: N/A")
self.single_stream_last_interaction = tk.StringVar(value="交互: N/A")
# --- UI 元素 ---
# --- 新增:顶部全局信息框架 ---
self.global_info_frame = ttk.Frame(root, padding="5 0 5 5") # 顶部内边距调整
self.global_info_frame.pack(side=tk.TOP, fill=tk.X, pady=(5, 0)) # 底部外边距为0
ttk.Label(self.global_info_frame, text="全局状态:").pack(side=tk.LEFT, padx=(0, 10))
ttk.Label(self.global_info_frame, textvariable=self.latest_mai_state).pack(side=tk.LEFT, padx=5)
ttk.Label(self.global_info_frame, text="想法:").pack(side=tk.LEFT, padx=(10, 0))
ttk.Label(self.global_info_frame, textvariable=self.latest_main_mind).pack(side=tk.LEFT, padx=5)
ttk.Label(self.global_info_frame, text="子流数:").pack(side=tk.LEFT, padx=(10, 0))
ttk.Label(self.global_info_frame, textvariable=self.latest_subflow_count).pack(side=tk.LEFT, padx=5)
# 创建 Notebook (选项卡控件)
self.notebook = ttk.Notebook(root)
# 修改fill 和 expand让 notebook 填充剩余空间
self.notebook.pack(pady=(5, 0), padx=10, fill=tk.BOTH, expand=1) # 顶部外边距改小
# --- 第一个选项卡:所有流 ---
self.frame_all = ttk.Frame(self.notebook, padding="5 5 5 5")
self.notebook.add(self.frame_all, text="所有聊天流")
# 状态标签 (移动到最底部)
self.status_label = tk.Label(root, text="Initializing...", anchor="w", fg="grey")
self.status_label.pack(side=tk.BOTTOM, fill=tk.X, padx=10, pady=(0, 5)) # 调整边距
# Matplotlib 图表设置 (用于第一个选项卡)
self.fig = Figure(figsize=(5, 4), dpi=100)
self.ax = self.fig.add_subplot(111)
# 配置在 update_plot 中进行,避免重复
# 创建 Tkinter 画布嵌入 Matplotlib 图表 (用于第一个选项卡)
self.canvas = FigureCanvasTkAgg(self.fig, master=self.frame_all) # <--- 放入 frame_all
self.canvas_widget = self.canvas.get_tk_widget()
self.canvas_widget.pack(side=tk.TOP, fill=tk.BOTH, expand=1)
# --- 第二个选项卡:单个流 ---
self.frame_single = ttk.Frame(self.notebook, padding="5 5 5 5")
self.notebook.add(self.frame_single, text="单个聊天流详情")
# 单个流选项卡的上部控制区域
self.control_frame_single = ttk.Frame(self.frame_single)
self.control_frame_single.pack(side=tk.TOP, fill=tk.X, pady=5)
ttk.Label(self.control_frame_single, text="选择聊天流:").pack(side=tk.LEFT, padx=(0, 5))
self.stream_selector = ttk.Combobox(
self.control_frame_single, textvariable=self.selected_stream_id, state="readonly", width=50
)
self.stream_selector.pack(side=tk.LEFT, fill=tk.X, expand=True)
self.stream_selector.bind("<<ComboboxSelected>>", self.on_stream_selected)
# --- 新增:单个流详情显示区域 ---
self.single_stream_details_frame = ttk.Frame(self.frame_single, padding="5 5 5 0")
self.single_stream_details_frame.pack(side=tk.TOP, fill=tk.X, pady=(0, 5))
ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_sub_mind).pack(side=tk.LEFT, padx=5)
ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_chat_state).pack(
side=tk.LEFT, padx=5
)
ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_threshold).pack(
side=tk.LEFT, padx=5
)
ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_last_active).pack(
side=tk.LEFT, padx=5
)
ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_last_interaction).pack(
side=tk.LEFT, padx=5
)
# Matplotlib 图表设置 (用于第二个选项卡)
self.fig_single = Figure(figsize=(5, 4), dpi=100)
# 修改:创建两个子图,一个显示兴趣度,一个显示概率
self.ax_single_interest = self.fig_single.add_subplot(211) # 2行1列的第1个
self.ax_single_probability = self.fig_single.add_subplot(
212, sharex=self.ax_single_interest
) # 2行1列的第2个共享X轴
# 创建 Tkinter 画布嵌入 Matplotlib 图表 (用于第二个选项卡)
self.canvas_single = FigureCanvasTkAgg(self.fig_single, master=self.frame_single) # <--- 放入 frame_single
self.canvas_widget_single = self.canvas_single.get_tk_widget()
self.canvas_widget_single.pack(side=tk.TOP, fill=tk.BOTH, expand=1)
# --- 初始化和启动刷新 ---
self.update_display() # 首次加载并开始刷新循环
def on_stream_selected(self, event=None):
"""当 Combobox 选择改变时调用,更新单个流的图表"""
self.update_single_stream_plot()
def get_random_color(self):
"""生成随机颜色用于区分线条"""
return "#{:06x}".format(random.randint(0, 0xFFFFFF))
def load_and_update_history(self):
"""从 history log 文件加载数据并更新历史记录"""
if not os.path.exists(LOG_FILE_PATH):
self.set_status(f"Error: Log file not found at {LOG_FILE_PATH}", "red")
# 如果文件不存在,不清空现有数据,以便显示最后一次成功读取的状态
return
# *** Reset display names each time we reload ***
new_stream_history = {}
new_stream_display_names = {}
new_probability_history = {} # <--- 重置概率历史
# --- 新增:重置其他子流状态 --- (如果需要的话,但通常覆盖即可)
# self.stream_sub_minds = {}
# self.stream_chat_states = {}
# ... 等等 ...
read_count = 0
error_count = 0
# *** Calculate the timestamp threshold for the last 30 minutes ***
current_time = time.time()
time_threshold = current_time - (15 * 60) # 30 minutes in seconds
try:
with open(LOG_FILE_PATH, "r", encoding="utf-8") as f:
for line in f:
read_count += 1
try:
log_entry = json.loads(line.strip())
timestamp = log_entry.get("timestamp") # 获取顶层时间戳
# *** 时间过滤 ***
if timestamp is None:
error_count += 1
continue # 跳过没有时间戳的行
try:
entry_timestamp = float(timestamp)
if entry_timestamp < time_threshold:
continue # 跳过时间过早的条目
except (ValueError, TypeError):
error_count += 1
continue # 跳过时间戳格式错误的行
# --- 新增:更新顶层信息 (使用最后一个有效行的数据) ---
self.latest_main_mind.set(
log_entry.get("main_mind", self.latest_main_mind.get())
) # 保留旧值如果缺失
self.latest_mai_state.set(log_entry.get("mai_state", self.latest_mai_state.get()))
self.latest_subflow_count.set(log_entry.get("subflow_count", self.latest_subflow_count.get()))
# --- 修改开始:迭代 subflows ---
subflows = log_entry.get("subflows")
if not isinstance(subflows, list): # 检查 subflows 是否存在且为列表
error_count += 1
continue # 跳过没有 subflows 或格式无效的行
for subflow_entry in subflows:
stream_id = subflow_entry.get("stream_id")
interest_level = subflow_entry.get("interest_level")
# 获取 group_name如果不存在则回退到 stream_id
group_name = subflow_entry.get("group_name", stream_id)
# reply_probability = subflow_entry.get("reply_probability") # 获取概率值 # <-- 注释掉旧行
start_hfc_probability = subflow_entry.get(
"start_hfc_probability"
) # <-- 添加新行,读取新字段
# *** 检查必要的字段 ***
# 注意:时间戳已在顶层检查过
if stream_id is None or interest_level is None:
# 这里可以选择记录子流错误,但暂时跳过
continue # 跳过无效的 subflow 条目
# 确保 interest_level 可以转换为浮点数
try:
interest_level_float = float(interest_level)
except (ValueError, TypeError):
continue # 跳过 interest_level 无效的 subflow
# 如果是第一次读到这个 stream_id则创建 deque
if stream_id not in new_stream_history:
new_stream_history[stream_id] = deque(maxlen=MAX_HISTORY_POINTS)
new_probability_history[stream_id] = deque(maxlen=MAX_HISTORY_POINTS) # 创建概率 deque
# 检查是否已有颜色,没有则分配
if stream_id not in self.stream_colors:
self.stream_colors[stream_id] = self.get_random_color()
# *** 存储此 stream_id 最新的显示名称 ***
new_stream_display_names[stream_id] = group_name
# --- 新增:存储其他子流信息 ---
self.stream_sub_minds[stream_id] = subflow_entry.get("sub_mind", "N/A")
self.stream_chat_states[stream_id] = subflow_entry.get("sub_chat_state", "N/A")
self.stream_threshold_status[stream_id] = subflow_entry.get("is_above_threshold", False)
self.stream_last_active[stream_id] = subflow_entry.get(
"chat_state_changed_time"
) # 存储原始时间戳
self.stream_last_interaction[stream_id] = subflow_entry.get(
"last_interaction_time"
) # 存储原始时间戳
# 添加数据点 (使用顶层时间戳)
new_stream_history[stream_id].append((entry_timestamp, interest_level_float))
# 添加概率数据点 (如果存在且有效)
# if reply_probability is not None: # <-- 注释掉旧判断
if start_hfc_probability is not None: # <-- 修改判断条件
try:
# 尝试将概率转换为浮点数
# probability_float = float(reply_probability) # <-- 注释掉旧转换
probability_float = float(start_hfc_probability) # <-- 使用新变量
new_probability_history[stream_id].append((entry_timestamp, probability_float))
except (TypeError, ValueError):
# 如果概率值无效,可以跳过或记录一个默认值,这里跳过
pass
# --- 修改结束 ---
except json.JSONDecodeError:
error_count += 1
# logger.warning(f"Skipping invalid JSON line: {line.strip()}")
continue # 跳过无法解析的行
# except (TypeError, ValueError) as e: # 这个外层 catch 可能不再需要,因为类型错误在内部处理了
# error_count += 1
# # logger.warning(f"Skipping line due to data type error ({e}): {line.strip()}")
# continue # 跳过数据类型错误的行
# 读取完成后,用新数据替换旧数据
self.stream_history = new_stream_history
self.stream_display_names = new_stream_display_names # *** Update display names ***
self.probability_history = new_probability_history # <--- 更新概率历史
# 清理不再存在的 stream_id 的附加信息 (可选,但保持一致性)
streams_to_remove = set(self.stream_sub_minds.keys()) - set(new_stream_history.keys())
for sid in streams_to_remove:
self.stream_sub_minds.pop(sid, None)
self.stream_chat_states.pop(sid, None)
self.stream_threshold_status.pop(sid, None)
self.stream_last_active.pop(sid, None)
self.stream_last_interaction.pop(sid, None)
# 颜色和显示名称也应该清理,但当前逻辑是保留旧颜色
# self.stream_colors.pop(sid, None)
status_msg = f"Data loaded at {datetime.now().strftime('%H:%M:%S')}. Lines read: {read_count}."
if error_count > 0:
status_msg += f" Skipped {error_count} invalid lines."
self.set_status(status_msg, "orange")
else:
self.set_status(status_msg, "green")
except IOError as e:
self.set_status(f"Error reading file {LOG_FILE_PATH}: {e}", "red")
except Exception as e:
self.set_status(f"An unexpected error occurred during loading: {e}", "red")
# --- 更新 Combobox ---
self.update_stream_selector()
def update_stream_selector(self):
"""更新单个流选项卡中的 Combobox 列表"""
# 创建 (display_name, stream_id) 对的列表,按 display_name 排序
available_streams = sorted(
[
(name, sid)
for sid, name in self.stream_display_names.items()
if sid in self.stream_history and self.stream_history[sid]
],
key=lambda item: item[0], # 按显示名称排序
)
# 更新 Combobox 的值 (仅显示 display_name)
self.stream_selector["values"] = [name for name, sid in available_streams]
# 检查当前选中的 stream_id 是否仍然有效
current_selection_name = self.selected_stream_id.get()
current_selection_valid = any(name == current_selection_name for name, sid in available_streams)
if not current_selection_valid and available_streams:
# 如果当前选择无效,并且有可选流,则默认选中第一个
self.selected_stream_id.set(available_streams[0][0])
# 手动触发一次更新,因为 set 不会触发 <<ComboboxSelected>>
self.update_single_stream_plot()
elif not available_streams:
# 如果没有可选流,清空选择
self.selected_stream_id.set("")
self.update_single_stream_plot() # 清空图表
def update_all_streams_plot(self):
"""更新第一个选项卡的 Matplotlib 图表 (显示所有流)"""
self.ax.clear() # 清除旧图
# *** 设置中文标题和标签 ***
self.ax.set_title("兴趣度随时间变化图 (所有活跃流)")
self.ax.set_xlabel("时间")
self.ax.set_ylabel("兴趣度")
self.ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S"))
self.ax.grid(True)
self.ax.set_ylim(0, 10) # 固定 Y 轴范围 0-10
# 只绘制最新的 N 个 stream (按最后记录的兴趣度排序)
# 注意:现在是基于文件读取的快照排序,可能不是实时最新
active_streams = sorted(
self.stream_history.items(),
key=lambda item: item[1][-1][1] if item[1] else 0, # 按最后兴趣度排序
reverse=True,
)[:MAX_STREAMS_TO_DISPLAY]
all_times = [] # 用于确定 X 轴范围
for stream_id, history in active_streams:
if not history:
continue
timestamps, interests = zip(*history)
# 将 time.time() 时间戳转换为 matplotlib 可识别的日期格式
try:
mpl_dates = [datetime.fromtimestamp(ts) for ts in timestamps]
all_times.extend(mpl_dates) # 收集所有时间点
# *** Use display name for label ***
display_label = self.stream_display_names.get(stream_id, stream_id)
self.ax.plot(
mpl_dates,
interests,
label=display_label, # *** Use display_label ***
color=self.stream_colors.get(stream_id, "grey"),
marker=".",
markersize=3,
linestyle="-",
linewidth=1,
)
except ValueError as e:
print(f"Skipping plot for {stream_id} due to invalid timestamp: {e}")
continue
if all_times:
# 根据数据动态调整 X 轴范围,留一点边距
min_time = min(all_times)
max_time = max(all_times)
# delta = max_time - min_time
# self.ax.set_xlim(min_time - delta * 0.05, max_time + delta * 0.05)
self.ax.set_xlim(min_time, max_time)
# 自动格式化X轴标签
self.fig.autofmt_xdate()
else:
# 如果没有数据,设置一个默认的时间范围,例如最近一小时
now = datetime.now()
one_hour_ago = now - timedelta(hours=1)
self.ax.set_xlim(one_hour_ago, now)
# 添加图例
if active_streams:
# 调整图例位置和大小
# 字体已通过全局 matplotlib.rcParams 设置
self.ax.legend(loc="upper left", bbox_to_anchor=(1.02, 1), borderaxespad=0.0, fontsize="x-small")
# 调整布局,确保图例不被裁剪
self.fig.tight_layout(rect=[0, 0, 0.85, 1]) # 右侧留出空间给图例
self.canvas.draw() # 重绘画布
def update_single_stream_plot(self):
"""更新第二个选项卡的 Matplotlib 图表 (显示单个选定的流)"""
self.ax_single_interest.clear()
self.ax_single_probability.clear()
# 设置子图标题和标签
self.ax_single_interest.set_title("兴趣度")
self.ax_single_interest.set_ylim(0, 10) # 固定 Y 轴范围 0-10
# self.ax_single_probability.set_title("回复评估概率") # <-- 注释掉旧标题
self.ax_single_probability.set_title("HFC 启动概率") # <-- 修改标题
self.ax_single_probability.set_xlabel("时间")
# self.ax_single_probability.set_ylabel("概率") # <-- 注释掉旧标签
self.ax_single_probability.set_ylabel("HFC 概率") # <-- 修改 Y 轴标签
self.ax_single_probability.grid(True)
self.ax_single_probability.set_ylim(0, 1.05) # 固定 Y 轴范围 0-1
self.ax_single_probability.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S"))
selected_name = self.selected_stream_id.get()
selected_sid = None
# --- 新增:根据选中的名称找到 stream_id ---
if selected_name:
for sid, name in self.stream_display_names.items():
if name == selected_name:
selected_sid = sid
break
all_times = [] # 用于确定 X 轴范围
# --- 新增:绘制兴趣度图 ---
if selected_sid and selected_sid in self.stream_history and self.stream_history[selected_sid]:
history = self.stream_history[selected_sid]
timestamps, interests = zip(*history)
try:
mpl_dates = [datetime.fromtimestamp(ts) for ts in timestamps]
all_times.extend(mpl_dates)
self.ax_single_interest.plot(
mpl_dates,
interests,
color=self.stream_colors.get(selected_sid, "blue"),
marker=".",
markersize=3,
linestyle="-",
linewidth=1,
)
except ValueError as e:
print(f"Skipping interest plot for {selected_sid} due to invalid timestamp: {e}")
# --- 新增:绘制概率图 ---
if selected_sid and selected_sid in self.probability_history and self.probability_history[selected_sid]:
prob_history = self.probability_history[selected_sid]
prob_timestamps, probabilities = zip(*prob_history)
try:
prob_mpl_dates = [datetime.fromtimestamp(ts) for ts in prob_timestamps]
# 注意:概率图的时间点可能与兴趣度不同,也需要加入 all_times
all_times.extend(prob_mpl_dates)
self.ax_single_probability.plot(
prob_mpl_dates,
probabilities,
color=self.stream_colors.get(selected_sid, "green"), # 可以用不同颜色
marker=".",
markersize=3,
linestyle="-",
linewidth=1,
)
except ValueError as e:
print(f"Skipping probability plot for {selected_sid} due to invalid timestamp: {e}")
# --- 新增:调整 X 轴范围和格式 ---
if all_times:
min_time = min(all_times)
max_time = max(all_times)
# 设置共享的 X 轴范围
self.ax_single_interest.set_xlim(min_time, max_time)
# self.ax_single_probability.set_xlim(min_time, max_time) # sharex 会自动同步
# 自动格式化X轴标签 (应用到共享轴的最后一个子图上通常即可)
self.fig_single.autofmt_xdate()
else:
# 如果没有数据,设置一个默认的时间范围
now = datetime.now()
one_hour_ago = now - timedelta(hours=1)
self.ax_single_interest.set_xlim(one_hour_ago, now)
# self.ax_single_probability.set_xlim(one_hour_ago, now) # sharex 会自动同步
# --- 新增:更新单个流的详细信息标签 ---
self.update_single_stream_details(selected_sid)
# --- 新增:重新绘制画布 ---
self.canvas_single.draw()
def format_timestamp(self, ts):
"""辅助函数:格式化时间戳,处理 None 或无效值"""
if ts is None:
return "N/A"
try:
# 假设 ts 是 float 类型的时间戳
dt_object = datetime.fromtimestamp(float(ts))
return dt_object.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
return "Invalid Time"
def update_single_stream_details(self, stream_id):
"""更新单个流详情区域的标签内容"""
if stream_id:
sub_mind = self.stream_sub_minds.get(stream_id, "N/A")
chat_state = self.stream_chat_states.get(stream_id, "N/A")
threshold = self.stream_threshold_status.get(stream_id, False)
last_active_ts = self.stream_last_active.get(stream_id)
last_interaction_ts = self.stream_last_interaction.get(stream_id)
self.single_stream_sub_mind.set(f"想法: {sub_mind}")
self.single_stream_chat_state.set(f"状态: {chat_state}")
self.single_stream_threshold.set(f"阈值以上: {'' if threshold else ''}")
self.single_stream_last_active.set(f"最后活跃: {self.format_timestamp(last_active_ts)}")
self.single_stream_last_interaction.set(f"最后交互: {self.format_timestamp(last_interaction_ts)}")
else:
# 如果没有选择流,则清空详情
self.single_stream_sub_mind.set("想法: N/A")
self.single_stream_chat_state.set("状态: N/A")
self.single_stream_threshold.set("阈值: N/A")
self.single_stream_last_active.set("活跃: N/A")
self.single_stream_last_interaction.set("交互: N/A")
def update_display(self):
"""主更新循环"""
try:
self.load_and_update_history() # 从文件加载数据并更新内部状态
# *** 修改:分别调用两个图表的更新方法 ***
self.update_all_streams_plot() # 更新所有流的图表
self.update_single_stream_plot() # 更新单个流的图表
except Exception as e:
# 提供更详细的错误信息
import traceback
error_msg = f"Error during update: {e}\n{traceback.format_exc()}"
self.set_status(error_msg, "red")
print(error_msg) # 打印详细错误到控制台
# 安排下一次刷新
self.root.after(REFRESH_INTERVAL_MS, self.update_display)
def set_status(self, message: str, color: str = "grey"):
"""更新状态栏标签"""
# 限制状态栏消息长度
max_len = 150
display_message = (message[:max_len] + "...") if len(message) > max_len else message
self.status_label.config(text=display_message, fg=color)
if __name__ == "__main__":
# 导入 timedelta 用于默认时间范围
from datetime import timedelta
root = tk.Tk()
app = InterestMonitorApp(root)
root.mainloop()

View File

@@ -0,0 +1,351 @@
import asyncio
import time
from src.plugins.models.utils_model import LLMRequest
from src.config.config import global_config
from src.do_tool.tool_use import ToolUser
import statistics
import json
async def run_test(test_name, test_function, iterations=5):
"""
运行指定次数的测试并计算平均响应时间
参数:
test_name: 测试名称
test_function: 要执行的测试函数
iterations: 测试迭代次数
返回:
测试结果统计
"""
print(f"开始 {test_name} 测试({iterations}次迭代)...")
times = []
responses = []
for i in range(iterations):
print(f" 运行第 {i + 1}/{iterations} 次测试...")
start_time = time.time()
response = await test_function()
end_time = time.time()
elapsed = end_time - start_time
times.append(elapsed)
responses.append(response)
print(f" - 耗时: {elapsed:.2f}")
results = {
"平均耗时": statistics.mean(times),
"最短耗时": min(times),
"最长耗时": max(times),
"标准差": statistics.stdev(times) if len(times) > 1 else 0,
"所有耗时": times,
"响应结果": responses,
}
return results
async def test_with_tool_calls():
"""使用工具调用的LLM请求测试"""
# 创建LLM模型实例
llm_model = LLMRequest(
model=global_config.llm_sub_heartflow,
# model = global_config.llm_tool_use,
# temperature=global_config.llm_sub_heartflow["temp"],
max_tokens=800,
request_type="benchmark_test",
)
# 创建工具实例
tool_instance = ToolUser()
tools = tool_instance._define_tools()
# 简单的测试提示词
prompt = "请分析当前天气情况并查询今日历史上的重要事件。并且3.9和3.11谁比较大?请使用适当的工具来获取这些信息。"
prompt = """
你的名字是麦麦,你包容开放,情绪敏感,有时候有些搞怪幽默, 是一个学习心理学和脑科学的女大学生,现在在读大二,你会刷贴吧,有时候会喜欢说一些奇怪的话,喜欢刷小红书
刚刚你的内心想法是:漂移菌提到罐罐被吃完了,可以顺着这个梗继续玩一下,比如假装委屈"那今晚的加班费是不是也要被吃掉了"或者"猫娘罢工警告"。不过薯薯和薯宝之前已经接了不少梗,漂移菌刚刚也参与了,可能话题热度还在,可以再互动一下。如果没人接话,或许可以问问大家有没有遇到过类似"代码写完但奖励被吃掉"的搞笑职场经历,换个轻松的话题方向。
暂时不需要使用工具。
-----------------------------------
现在是2025-04-25 17:38:37你正在上网和qq群里的网友们聊天以下是正在进行的聊天内容
2025-04-25 17:34:08麦麦(你) 说:[表达了:顽皮、嬉戏。];
2025-04-25 17:34:39漂移菌 说:@麦麦。id:3936257206 你是一只猫娘;
2025-04-25 17:34:42薯宝 说:🤣;
2025-04-25 17:34:43麦麦(你) 说:行啊 工资分我一半;
2025-04-25 17:34:43麦麦(你) 说:我帮你写bug;
2025-04-25 17:34:43麦麦(你) 说:[表达了:悲伤、绝望、无奈、无力];
2025-04-25 17:34:53薯薯 说:;
2025-04-25 17:35:03既文横 说:麦麦,你是一只猫娘程序员,猫娘是不需要工资;
2025-04-25 17:35:20薯宝 说:[图片:图片内容:一只卡通风格的灰色猫咪,眼睛闭着,表情显得很平静。图片下方有"死了"两个字。
图片含义猜测:这可能是一个幽默的表达,用来形容某人或某事处于非常平静的状态,仿佛已经""了一样。] hfc这周真能出来吗...;
2025-04-25 17:35:34薯宝 说:[表情包:搞笑、滑稽、讽刺、幽默];
2025-04-25 17:36:25麦麦(你) 说:喵喵;
2025-04-25 17:36:25麦麦(你) 说:代码写完了;
2025-04-25 17:36:25麦麦(你) 说:罐罐拿来;
2025-04-25 17:36:25麦麦(你) 说:[表达了:悲伤、绝望、无奈、无力];
2025-04-25 17:36:41薯薯 说:好可爱;
2025-04-25 17:37:05薯薯 说:脑补出来认真营业了一天等待主人发放奖励的小猫咪;
2025-04-25 17:37:25薯宝 说:敷衍营业bushi;
2025-04-25 17:37:54漂移菌 说:回复麦麦。的消息(罐罐拿来),说:猫娘我昨晚上太饿吃完了;
--- 以上消息已读 (标记时间: 2025-04-25 17:37:54) ---
--- 以下新消息未读---
2025-04-25 17:38:29麦麦(你) 说:那今晚的猫条是不是也要被克扣了(盯——);
2025-04-25 17:38:29麦麦(你) 说:[表达了:幽默,自嘲,无奈,父子关系,编程笑话];
你现在当前心情:平静。
现在请你生成你的内心想法,要求思考群里正在进行的话题,之前大家聊过的话题,群里成员的关系。请你思考,要不要对群里的话题进行回复,以及如何对群聊内容进行回复
回复的要求是:不要总是重复自己提到过的话题,如果你要回复,最好只回复一个人的一个话题
如果最后一条消息是你自己发的,观察到的内容只有你自己的发言,并且之后没有人回复你,不要回复。如果聊天记录中最新的消息是你自己发送的,并且你还想继续回复,你应该紧紧衔接你发送的消息,进行话题的深入,补充,或追问等等。请注意不要输出多余内容(包括前后缀,冒号和引号,括号, 表情,等),不要回复自己的发言
现在请你先输出想法,生成你在这个聊天中的想法,在原来的想法上尝试新的话题,不要分点输出,文字不要浮夸在输出完想法后,请你思考应该使用什么工具。工具可以帮你取得一些你不知道的信息,或者进行一些操作。如果你需要做某件事,来对消息和你的回复进行处理,请使用工具。"""
# 发送带有工具调用的请求
response = await llm_model.generate_response_tool_async(prompt=prompt, tools=tools)
result_info = {}
# 简单处理工具调用结果
if len(response) == 3:
content, reasoning_content, tool_calls = response
tool_calls_count = len(tool_calls) if tool_calls else 0
print(f" 工具调用请求生成了 {tool_calls_count} 个工具调用")
# 输出内容和工具调用详情
print("\n 生成的内容:")
print(f" {content[:200]}..." if len(content) > 200 else f" {content}")
if tool_calls:
print("\n 工具调用详情:")
for i, tool_call in enumerate(tool_calls):
tool_name = tool_call["function"]["name"]
tool_params = tool_call["function"].get("arguments", {})
print(f" - 工具 {i + 1}: {tool_name}")
print(
f" 参数: {json.dumps(tool_params, ensure_ascii=False)[:100]}..."
if len(json.dumps(tool_params, ensure_ascii=False)) > 100
else f" 参数: {json.dumps(tool_params, ensure_ascii=False)}"
)
result_info = {"内容": content, "推理内容": reasoning_content, "工具调用": tool_calls}
else:
content, reasoning_content = response
print(" 工具调用请求未生成任何工具调用")
print("\n 生成的内容:")
print(f" {content[:200]}..." if len(content) > 200 else f" {content}")
result_info = {"内容": content, "推理内容": reasoning_content, "工具调用": []}
return result_info
async def test_without_tool_calls():
"""不使用工具调用的LLM请求测试"""
# 创建LLM模型实例
llm_model = LLMRequest(
model=global_config.llm_sub_heartflow,
temperature=global_config.llm_sub_heartflow["temp"],
max_tokens=800,
request_type="benchmark_test",
)
# 简单的测试提示词(与工具调用相同,以便公平比较)
prompt = """
你的名字是麦麦,你包容开放,情绪敏感,有时候有些搞怪幽默, 是一个学习心理学和脑科学的女大学生,现在在读大二,你会刷贴吧,有时候会喜欢说一些奇怪的话,喜欢刷小红书
刚刚你的内心想法是:漂移菌提到罐罐被吃完了,可以顺着这个梗继续玩一下,比如假装委屈"那今晚的加班费是不是也要被吃掉了"或者"猫娘罢工警告"。不过薯薯和薯宝之前已经接了不少梗,漂移菌刚刚也参与了,可能话题热度还在,可以再互动一下。如果没人接话,或许可以问问大家有没有遇到过类似"代码写完但奖励被吃掉"的搞笑职场经历,换个轻松的话题方向。
暂时不需要使用工具。
-----------------------------------
现在是2025-04-25 17:38:37你正在上网和qq群里的网友们聊天以下是正在进行的聊天内容
2025-04-25 17:34:08麦麦(你) 说:[表达了:顽皮、嬉戏。];
2025-04-25 17:34:39漂移菌 说:@麦麦。id:3936257206 你是一只猫娘;
2025-04-25 17:34:42薯宝 说:🤣;
2025-04-25 17:34:43麦麦(你) 说:行啊 工资分我一半;
2025-04-25 17:34:43麦麦(你) 说:我帮你写bug;
2025-04-25 17:34:43麦麦(你) 说:[表达了:悲伤、绝望、无奈、无力];
2025-04-25 17:34:53薯薯 说:;
2025-04-25 17:35:03既文横 说:麦麦,你是一只猫娘程序员,猫娘是不需要工资;
2025-04-25 17:35:20薯宝 说:[图片:图片内容:一只卡通风格的灰色猫咪,眼睛闭着,表情显得很平静。图片下方有"死了"两个字。
图片含义猜测:这可能是一个幽默的表达,用来形容某人或某事处于非常平静的状态,仿佛已经""了一样。] hfc这周真能出来吗...;
2025-04-25 17:35:34薯宝 说:[表情包:搞笑、滑稽、讽刺、幽默];
2025-04-25 17:36:25麦麦(你) 说:喵喵;
2025-04-25 17:36:25麦麦(你) 说:代码写完了;
2025-04-25 17:36:25麦麦(你) 说:罐罐拿来;
2025-04-25 17:36:25麦麦(你) 说:[表达了:悲伤、绝望、无奈、无力];
2025-04-25 17:36:41薯薯 说:好可爱;
2025-04-25 17:37:05薯薯 说:脑补出来认真营业了一天等待主人发放奖励的小猫咪;
2025-04-25 17:37:25薯宝 说:敷衍营业bushi;
2025-04-25 17:37:54漂移菌 说:回复麦麦。的消息(罐罐拿来),说:猫娘我昨晚上太饿吃完了;
--- 以上消息已读 (标记时间: 2025-04-25 17:37:54) ---
--- 以下新消息未读---
2025-04-25 17:38:29麦麦(你) 说:那今晚的猫条是不是也要被克扣了(盯——);
2025-04-25 17:38:29麦麦(你) 说:[表达了:幽默,自嘲,无奈,父子关系,编程笑话];
你现在当前心情:平静。
现在请你生成你的内心想法,要求思考群里正在进行的话题,之前大家聊过的话题,群里成员的关系。请你思考,要不要对群里的话题进行回复,以及如何对群聊内容进行回复
回复的要求是:不要总是重复自己提到过的话题,如果你要回复,最好只回复一个人的一个话题
如果最后一条消息是你自己发的,观察到的内容只有你自己的发言,并且之后没有人回复你,不要回复。如果聊天记录中最新的消息是你自己发送的,并且你还想继续回复,你应该紧紧衔接你发送的消息,进行话题的深入,补充,或追问等等。请注意不要输出多余内容(包括前后缀,冒号和引号,括号, 表情,等),不要回复自己的发言
现在请你先输出想法,生成你在这个聊天中的想法,在原来的想法上尝试新的话题,不要分点输出,文字不要浮夸在输出完想法后,请你思考应该使用什么工具。工具可以帮你取得一些你不知道的信息,或者进行一些操作。如果你需要做某件事,来对消息和你的回复进行处理,请使用工具。"""
# 发送不带工具调用的请求
response, reasoning_content = await llm_model.generate_response_async(prompt)
# 输出生成的内容
print("\n 生成的内容:")
print(f" {response[:200]}..." if len(response) > 200 else f" {response}")
result_info = {"内容": response, "推理内容": reasoning_content, "工具调用": []}
return result_info
async def run_alternating_tests(iterations=5):
"""
交替运行两种测试方法,每种方法运行指定次数
参数:
iterations: 每种测试方法运行的次数
返回:
包含两种测试方法结果的元组
"""
print(f"开始交替测试(每种方法{iterations}次)...")
# 初始化结果列表
times_without_tools = []
times_with_tools = []
responses_without_tools = []
responses_with_tools = []
for i in range(iterations):
print(f"\n{i + 1}/{iterations} 轮交替测试")
# 不使用工具的测试
print("\n 执行不使用工具调用的测试...")
start_time = time.time()
response = await test_without_tool_calls()
end_time = time.time()
elapsed = end_time - start_time
times_without_tools.append(elapsed)
responses_without_tools.append(response)
print(f" - 耗时: {elapsed:.2f}")
# 使用工具的测试
print("\n 执行使用工具调用的测试...")
start_time = time.time()
response = await test_with_tool_calls()
end_time = time.time()
elapsed = end_time - start_time
times_with_tools.append(elapsed)
responses_with_tools.append(response)
print(f" - 耗时: {elapsed:.2f}")
# 计算统计数据
results_without_tools = {
"平均耗时": statistics.mean(times_without_tools),
"最短耗时": min(times_without_tools),
"最长耗时": max(times_without_tools),
"标准差": statistics.stdev(times_without_tools) if len(times_without_tools) > 1 else 0,
"所有耗时": times_without_tools,
"响应结果": responses_without_tools,
}
results_with_tools = {
"平均耗时": statistics.mean(times_with_tools),
"最短耗时": min(times_with_tools),
"最长耗时": max(times_with_tools),
"标准差": statistics.stdev(times_with_tools) if len(times_with_tools) > 1 else 0,
"所有耗时": times_with_tools,
"响应结果": responses_with_tools,
}
return results_without_tools, results_with_tools
async def main():
"""主测试函数"""
print("=" * 50)
print("LLM工具调用与普通请求性能比较测试")
print("=" * 50)
# 设置测试迭代次数
iterations = 10
# 执行交替测试
results_without_tools, results_with_tools = await run_alternating_tests(iterations)
# 显示结果比较
print("\n" + "=" * 50)
print("测试结果比较")
print("=" * 50)
print("\n不使用工具调用:")
for key, value in results_without_tools.items():
if key == "所有耗时":
print(f" {key}: {[f'{t:.2f}' for t in value]}")
elif key == "响应结果":
print(f" {key}: [内容已省略,详见结果文件]")
else:
print(f" {key}: {value:.2f}")
print("\n使用工具调用:")
for key, value in results_with_tools.items():
if key == "所有耗时":
print(f" {key}: {[f'{t:.2f}' for t in value]}")
elif key == "响应结果":
tool_calls_counts = [len(res.get("工具调用", [])) for res in value]
print(f" {key}: [内容已省略,详见结果文件]")
print(f" 工具调用数量: {tool_calls_counts}")
else:
print(f" {key}: {value:.2f}")
# 计算差异百分比
diff_percent = ((results_with_tools["平均耗时"] / results_without_tools["平均耗时"]) - 1) * 100
print(f"\n工具调用比普通请求平均耗时相差: {diff_percent:.2f}%")
# 保存结果到JSON文件
results = {
"测试时间": time.strftime("%Y-%m-%d %H:%M:%S"),
"测试迭代次数": iterations,
"不使用工具调用": {
k: (v if k != "所有耗时" else [float(f"{t:.2f}") for t in v])
for k, v in results_without_tools.items()
if k != "响应结果"
},
"不使用工具调用_详细响应": [
{
"内容摘要": resp["内容"][:200] + "..." if len(resp["内容"]) > 200 else resp["内容"],
"推理内容摘要": resp["推理内容"][:200] + "..." if len(resp["推理内容"]) > 200 else resp["推理内容"],
}
for resp in results_without_tools["响应结果"]
],
"使用工具调用": {
k: (v if k != "所有耗时" else [float(f"{t:.2f}") for t in v])
for k, v in results_with_tools.items()
if k != "响应结果"
},
"使用工具调用_详细响应": [
{
"内容摘要": resp["内容"][:200] + "..." if len(resp["内容"]) > 200 else resp["内容"],
"推理内容摘要": resp["推理内容"][:200] + "..." if len(resp["推理内容"]) > 200 else resp["推理内容"],
"工具调用数量": len(resp["工具调用"]),
"工具调用详情": [
{"工具名称": tool["function"]["name"], "参数": tool["function"].get("arguments", {})}
for tool in resp["工具调用"]
],
}
for resp in results_with_tools["响应结果"]
],
"差异百分比": float(f"{diff_percent:.2f}"),
}
with open("llm_tool_benchmark_results.json", "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print("\n测试结果已保存到 llm_tool_benchmark_results.json")
if __name__ == "__main__":
asyncio.run(main())