Files
Mofox-Core/src/gui/reasoning_gui.py
2025-04-08 15:31:13 +09:00

343 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# import os
# import queue
# import sys
# import threading
# import time
# from datetime import datetime
# from typing import Dict, List
# from typing import Optional
# sys.path.insert(0, sys.path[0] + "/../")
# sys.path.insert(0, sys.path[0] + "/../")
# from src.common.logger import get_module_logger
# import customtkinter as ctk
# from dotenv import load_dotenv
# logger = get_module_logger("gui")
# # 获取当前文件的目录
# current_dir = os.path.dirname(os.path.abspath(__file__))
# # 获取项目根目录
# root_dir = os.path.abspath(os.path.join(current_dir, "..", ".."))
# sys.path.insert(0, root_dir)
# from src.common.database import db # noqa: E402
# # 加载环境变量
# if os.path.exists(os.path.join(root_dir, ".env.dev")):
# load_dotenv(os.path.join(root_dir, ".env.dev"))
# logger.info("成功加载开发环境配置")
# elif os.path.exists(os.path.join(root_dir, ".env")):
# load_dotenv(os.path.join(root_dir, ".env"))
# logger.info("成功加载生产环境配置")
# else:
# logger.error("未找到环境配置文件")
# sys.exit(1)
# class ReasoningGUI:
# def __init__(self):
# # 记录启动时间戳转换为Unix时间戳
# self.start_timestamp = datetime.now().timestamp()
# logger.info(f"程序启动时间戳: {self.start_timestamp}")
# # 设置主题
# ctk.set_appearance_mode("dark")
# ctk.set_default_color_theme("blue")
# # 创建主窗口
# self.root = ctk.CTk()
# self.root.title("麦麦推理")
# self.root.geometry("800x600")
# self.root.protocol("WM_DELETE_WINDOW", self._on_closing)
# # 存储群组数据
# self.group_data: Dict[str, List[dict]] = {}
# # 创建更新队列
# self.update_queue = queue.Queue()
# # 创建主框架
# self.frame = ctk.CTkFrame(self.root)
# self.frame.pack(pady=20, padx=20, fill="both", expand=True)
# # 添加标题
# self.title = ctk.CTkLabel(self.frame, text="麦麦的脑内所想", font=("Arial", 24))
# self.title.pack(pady=10, padx=10)
# # 创建左右分栏
# self.paned = ctk.CTkFrame(self.frame)
# self.paned.pack(fill="both", expand=True, padx=10, pady=10)
# # 左侧群组列表
# self.left_frame = ctk.CTkFrame(self.paned, width=200)
# self.left_frame.pack(side="left", fill="y", padx=5, pady=5)
# self.group_label = ctk.CTkLabel(self.left_frame, text="群组列表", font=("Arial", 16))
# self.group_label.pack(pady=5)
# # 创建可滚动框架来容纳群组按钮
# self.group_scroll_frame = ctk.CTkScrollableFrame(self.left_frame, width=180, height=400)
# self.group_scroll_frame.pack(pady=5, padx=5, fill="both", expand=True)
# # 存储群组按钮的字典
# self.group_buttons: Dict[str, ctk.CTkButton] = {}
# # 当前选中的群组ID
# self.selected_group_id: Optional[str] = None
# # 右侧内容显示
# self.right_frame = ctk.CTkFrame(self.paned)
# self.right_frame.pack(side="right", fill="both", expand=True, padx=5, pady=5)
# self.content_label = ctk.CTkLabel(self.right_frame, text="推理内容", font=("Arial", 16))
# self.content_label.pack(pady=5)
# # 创建富文本显示框
# self.content_text = ctk.CTkTextbox(self.right_frame, width=500, height=400)
# self.content_text.pack(pady=5, padx=5, fill="both", expand=True)
# # 配置文本标签 - 只使用颜色
# self.content_text.tag_config("timestamp", foreground="#888888") # 时间戳使用灰色
# self.content_text.tag_config("user", foreground="#4CAF50") # 用户名使用绿色
# self.content_text.tag_config("message", foreground="#2196F3") # 消息使用蓝色
# self.content_text.tag_config("model", foreground="#9C27B0") # 模型名称使用紫色
# self.content_text.tag_config("prompt", foreground="#FF9800") # prompt内容使用橙色
# self.content_text.tag_config("reasoning", foreground="#FF9800") # 推理过程使用橙色
# self.content_text.tag_config("response", foreground="#E91E63") # 回复使用粉色
# self.content_text.tag_config("separator", foreground="#666666") # 分隔符使用深灰色
# # 底部控制栏
# self.control_frame = ctk.CTkFrame(self.frame)
# self.control_frame.pack(fill="x", padx=10, pady=5)
# self.clear_button = ctk.CTkButton(self.control_frame, text="清除显示", command=self.clear_display, width=120)
# self.clear_button.pack(side="left", padx=5)
# # 添加标志标记GUI是否已关闭
# self.is_running = True
# # 启动自动更新线程
# self.update_thread = threading.Thread(target=self._auto_update, daemon=True)
# self.update_thread.start()
# # 启动GUI更新检查
# self.root.after(100, self._process_queue)
# def _on_closing(self):
# """处理窗口关闭事件"""
# # 标记GUI已关闭防止后台线程继续访问tkinter对象
# self.is_running = False
# # 安全清理所有可能的tkinter变量
# for attr_name in list(self.__dict__.keys()):
# if isinstance(getattr(self, attr_name), (ctk.Variable, ctk.StringVar, ctk.IntVar, ctk.DoubleVar, ctk.BooleanVar)):
# # 删除变量前安全地将其设置为None
# try:
# var = getattr(self, attr_name)
# var.set(None)
# except Exception:
# pass
# setattr(self, attr_name, None)
# # 退出
# self.root.quit()
# sys.exit(0)
# def _process_queue(self):
# """处理更新队列中的任务"""
# try:
# while True:
# task = self.update_queue.get_nowait()
# if task["type"] == "update_group_list":
# self._update_group_list_gui()
# elif task["type"] == "update_display":
# self._update_display_gui(task["group_id"])
# except queue.Empty:
# pass
# finally:
# # 继续检查队列但仅在GUI仍在运行时
# if self.is_running:
# self.root.after(100, self._process_queue)
# def _update_group_list_gui(self):
# """在主线程中更新群组列表"""
# # 清除现有按钮
# for button in self.group_buttons.values():
# button.destroy()
# self.group_buttons.clear()
# # 创建新的群组按钮
# for group_id in self.group_data.keys():
# button = ctk.CTkButton(
# self.group_scroll_frame,
# text=f"群号: {group_id}",
# width=160,
# height=30,
# corner_radius=8,
# command=lambda gid=group_id: self._on_group_select(gid),
# )
# button.pack(pady=2, padx=5)
# self.group_buttons[group_id] = button
# # 如果有选中的群组,保持其高亮状态
# if self.selected_group_id and self.selected_group_id in self.group_buttons:
# self._highlight_selected_group(self.selected_group_id)
# def _on_group_select(self, group_id: str):
# """处理群组选择事件"""
# self._highlight_selected_group(group_id)
# self._update_display_gui(group_id)
# def _highlight_selected_group(self, group_id: str):
# """高亮显示选中的群组按钮"""
# # 重置所有按钮的颜色
# for gid, button in self.group_buttons.items():
# if gid == group_id:
# # 设置选中按钮的颜色
# button.configure(fg_color="#1E88E5", hover_color="#1976D2")
# else:
# # 恢复其他按钮的默认颜色
# button.configure(fg_color="#2B2B2B", hover_color="#404040")
# self.selected_group_id = group_id
# def _update_display_gui(self, group_id: str):
# """在主线程中更新显示内容"""
# if group_id in self.group_data:
# self.content_text.delete("1.0", "end")
# for item in self.group_data[group_id]:
# # 时间戳
# time_str = item["time"].strftime("%Y-%m-%d %H:%M:%S")
# self.content_text.insert("end", f"[{time_str}]\n", "timestamp")
# # 用户信息
# self.content_text.insert("end", "用户: ", "timestamp")
# self.content_text.insert("end", f"{item.get('user', '未知')}\n", "user")
# # 消息内容
# self.content_text.insert("end", "消息: ", "timestamp")
# self.content_text.insert("end", f"{item.get('message', '')}\n", "message")
# # 模型信息
# self.content_text.insert("end", "模型: ", "timestamp")
# self.content_text.insert("end", f"{item.get('model', '')}\n", "model")
# # Prompt内容
# self.content_text.insert("end", "Prompt内容:\n", "timestamp")
# prompt_text = item.get("prompt", "")
# if prompt_text and prompt_text.lower() != "none":
# lines = prompt_text.split("\n")
# for line in lines:
# if line.strip():
# self.content_text.insert("end", " " + line + "\n", "prompt")
# else:
# self.content_text.insert("end", " 无Prompt内容\n", "prompt")
# # 推理过程
# self.content_text.insert("end", "推理过程:\n", "timestamp")
# reasoning_text = item.get("reasoning", "")
# if reasoning_text and reasoning_text.lower() != "none":
# lines = reasoning_text.split("\n")
# for line in lines:
# if line.strip():
# self.content_text.insert("end", " " + line + "\n", "reasoning")
# else:
# self.content_text.insert("end", " 无推理过程\n", "reasoning")
# # 回复内容
# self.content_text.insert("end", "回复: ", "timestamp")
# self.content_text.insert("end", f"{item.get('response', '')}\n", "response")
# # 分隔符
# self.content_text.insert("end", f"\n{'=' * 50}\n\n", "separator")
# # 滚动到顶部
# self.content_text.see("1.0")
# def _auto_update(self):
# """自动更新函数"""
# while True:
# if not self.is_running:
# break # 如果GUI已关闭停止线程
# try:
# # 从数据库获取最新数据,只获取启动时间之后的记录
# query = {"time": {"$gt": self.start_timestamp}}
# logger.debug(f"查询条件: {query}")
# # 先获取一条记录检查时间格式
# sample = db.reasoning_logs.find_one()
# if sample:
# logger.debug(f"样本记录时间格式: {type(sample['time'])} 值: {sample['time']}")
# cursor = db.reasoning_logs.find(query).sort("time", -1)
# new_data = {}
# total_count = 0
# for item in cursor:
# # 调试输出
# if total_count == 0:
# logger.debug(f"记录时间: {item['time']}, 类型: {type(item['time'])}")
# total_count += 1
# group_id = str(item.get("group_id", "unknown"))
# if group_id not in new_data:
# new_data[group_id] = []
# # 转换时间戳为datetime对象
# if isinstance(item["time"], (int, float)):
# time_obj = datetime.fromtimestamp(item["time"])
# elif isinstance(item["time"], datetime):
# time_obj = item["time"]
# else:
# logger.warning(f"未知的时间格式: {type(item['time'])}")
# time_obj = datetime.now() # 使用当前时间作为后备
# new_data[group_id].append(
# {
# "time": time_obj,
# "user": item.get("user", "未知"),
# "message": item.get("message", ""),
# "model": item.get("model", "未知"),
# "reasoning": item.get("reasoning", ""),
# "response": item.get("response", ""),
# "prompt": item.get("prompt", ""), # 添加prompt字段
# }
# )
# logger.info(f"从数据库加载了 {total_count} 条记录,分布在 {len(new_data)} 个群组中")
# # 更新数据
# if new_data != self.group_data:
# self.group_data = new_data
# logger.info("数据已更新,正在刷新显示...")
# # 将更新任务添加到队列
# self.update_queue.put({"type": "update_group_list"})
# if self.group_data:
# # 如果没有选中的群组,选择最新的群组
# if not self.selected_group_id or self.selected_group_id not in self.group_data:
# self.selected_group_id = next(iter(self.group_data))
# self.update_queue.put({"type": "update_display", "group_id": self.selected_group_id})
# except Exception:
# logger.exception("自动更新出错")
# # 每5秒更新一次
# time.sleep(5)
# def clear_display(self):
# """清除显示内容"""
# self.content_text.delete("1.0", "end")
# def run(self):
# """运行GUI"""
# self.root.mainloop()
# def main():
# app = ReasoningGUI()
# app.run()
# if __name__ == "__main__":
# main()