# import customtkinter as ctk # import subprocess # import threading # import queue # import re # import os # import signal # from collections import deque # import sys # # 设置应用的外观模式和默认颜色主题 # ctk.set_appearance_mode("dark") # ctk.set_default_color_theme("blue") # class LogViewerApp(ctk.CTk): # """日志查看器应用的主类,继承自customtkinter的CTk类""" # def __init__(self): # """初始化日志查看器应用的界面和状态""" # super().__init__() # self.title("日志查看器") # self.geometry("1200x800") # # 标记GUI是否运行中 # self.is_running = True # # 程序关闭时的清理操作 # self.protocol("WM_DELETE_WINDOW", self._on_closing) # # 初始化进程、日志队列、日志数据等变量 # self.process = None # self.log_queue = queue.Queue() # self.log_data = deque(maxlen=10000) # 使用固定长度队列 # self.available_levels = set() # self.available_modules = set() # self.sorted_modules = [] # self.module_checkboxes = {} # 存储模块复选框的字典 # # 日志颜色配置 # self.color_config = { # "time": "#888888", # "DEBUG": "#2196F3", # "INFO": "#4CAF50", # "WARNING": "#FF9800", # "ERROR": "#F44336", # "module": "#D4D0AB", # "default": "#FFFFFF", # } # # 列可见性配置 # self.column_visibility = {"show_time": True, "show_level": True, "show_module": True} # # 选中的日志等级和模块 # self.selected_levels = set() # self.selected_modules = set() # # 创建界面组件并启动日志队列处理 # self.create_widgets() # self.after(100, self.process_log_queue) # def create_widgets(self): # """创建应用界面的各个组件""" # self.grid_columnconfigure(0, weight=1) # self.grid_rowconfigure(1, weight=1) # # 控制面板 # control_frame = ctk.CTkFrame(self) # control_frame.grid(row=0, column=0, sticky="ew", padx=10, pady=5) # self.start_btn = ctk.CTkButton(control_frame, text="启动", command=self.start_process) # self.start_btn.pack(side="left", padx=5) # self.stop_btn = ctk.CTkButton(control_frame, text="停止", command=self.stop_process, state="disabled") # self.stop_btn.pack(side="left", padx=5) # self.clear_btn = ctk.CTkButton(control_frame, text="清屏", command=self.clear_logs) # self.clear_btn.pack(side="left", padx=5) # column_filter_frame = ctk.CTkFrame(control_frame) # column_filter_frame.pack(side="left", padx=20) # self.time_check = ctk.CTkCheckBox(column_filter_frame, text="显示时间", command=self.refresh_logs) # self.time_check.pack(side="left", padx=5) # self.time_check.select() # self.level_check = ctk.CTkCheckBox(column_filter_frame, text="显示等级", command=self.refresh_logs) # self.level_check.pack(side="left", padx=5) # self.level_check.select() # self.module_check = ctk.CTkCheckBox(column_filter_frame, text="显示模块", command=self.refresh_logs) # self.module_check.pack(side="left", padx=5) # self.module_check.select() # # 筛选面板 # filter_frame = ctk.CTkFrame(self) # filter_frame.grid(row=0, column=1, rowspan=2, sticky="ns", padx=5) # ctk.CTkLabel(filter_frame, text="日志等级筛选").pack(pady=5) # self.level_scroll = ctk.CTkScrollableFrame(filter_frame, width=150, height=200) # self.level_scroll.pack(fill="both", expand=True, padx=5) # ctk.CTkLabel(filter_frame, text="模块筛选").pack(pady=5) # self.module_filter_entry = ctk.CTkEntry(filter_frame, placeholder_text="输入模块过滤词") # self.module_filter_entry.pack(pady=5) # self.module_filter_entry.bind("", self.update_module_filter) # self.module_scroll = ctk.CTkScrollableFrame(filter_frame, width=300, height=200) # self.module_scroll.pack(fill="both", expand=True, padx=5) # self.log_text = ctk.CTkTextbox(self, wrap="word") # self.log_text.grid(row=1, column=0, sticky="nsew", padx=10, pady=5) # self.init_text_tags() # def update_module_filter(self, event): # """根据模块过滤词更新模块复选框的显示""" # filter_text = self.module_filter_entry.get().strip().lower() # for module, checkbox in self.module_checkboxes.items(): # if filter_text in module.lower(): # checkbox.pack(anchor="w", padx=5, pady=2) # else: # checkbox.pack_forget() # def update_filters(self, level, module): # """更新日志等级和模块的筛选器""" # if level not in self.available_levels: # self.available_levels.add(level) # self.add_checkbox(self.level_scroll, level, "level") # module_key = self.get_module_key(module) # if module_key not in self.available_modules: # self.available_modules.add(module_key) # self.sorted_modules = sorted(self.available_modules, key=lambda x: x.lower()) # self.rebuild_module_checkboxes() # def rebuild_module_checkboxes(self): # """重新构建模块复选框""" # # 清空现有复选框 # for widget in self.module_scroll.winfo_children(): # widget.destroy() # self.module_checkboxes.clear() # # 重建排序后的复选框 # for module in self.sorted_modules: # self.add_checkbox(self.module_scroll, module, "module") # def add_checkbox(self, parent, text, type_): # """在指定父组件中添加复选框""" # def update_filter(): # current = cb.get() # if type_ == "level": # (self.selected_levels.add if current else self.selected_levels.discard)(text) # else: # (self.selected_modules.add if current else self.selected_modules.discard)(text) # self.refresh_logs() # cb = ctk.CTkCheckBox(parent, text=text, command=update_filter) # cb.select() # 初始选中 # # 手动同步初始状态到集合(关键修复) # if type_ == "level": # self.selected_levels.add(text) # else: # self.selected_modules.add(text) # if type_ == "module": # self.module_checkboxes[text] = cb # cb.pack(anchor="w", padx=5, pady=2) # return cb # def check_filter(self, entry): # """检查日志条目是否符合当前筛选条件""" # level_ok = not self.selected_levels or entry["level"] in self.selected_levels # module_key = self.get_module_key(entry["module"]) # module_ok = not self.selected_modules or module_key in self.selected_modules # return level_ok and module_ok # def init_text_tags(self): # """初始化日志文本的颜色标签""" # for tag, color in self.color_config.items(): # self.log_text.tag_config(tag, foreground=color) # self.log_text.tag_config("default", foreground=self.color_config["default"]) # def start_process(self): # """启动日志进程并开始读取输出""" # self.process = subprocess.Popen( # ["nb", "run"], # stdout=subprocess.PIPE, # stderr=subprocess.STDOUT, # text=True, # bufsize=1, # encoding="utf-8", # errors="ignore", # ) # self.start_btn.configure(state="disabled") # self.stop_btn.configure(state="normal") # threading.Thread(target=self.read_output, daemon=True).start() # def stop_process(self): # """停止日志进程并清理相关资源""" # if self.process: # try: # if hasattr(self.process, "pid"): # if os.name == "nt": # subprocess.run( # ["taskkill", "/F", "/T", "/PID", str(self.process.pid)], check=True, capture_output=True # ) # else: # os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) # except (subprocess.CalledProcessError, ProcessLookupError, OSError) as e: # print(f"终止进程失败: {e}") # finally: # self.process = None # self.log_queue.queue.clear() # self.start_btn.configure(state="normal") # self.stop_btn.configure(state="disabled") # self.refresh_logs() # def read_output(self): # """读取日志进程的输出并放入队列""" # try: # while self.process and self.process.poll() is None and self.is_running: # line = self.process.stdout.readline() # if line: # self.log_queue.put(line) # else: # break # 避免空循环 # self.process.stdout.close() # 确保关闭文件描述符 # except ValueError: # 处理可能的I/O操作异常 # pass # def process_log_queue(self): # """处理日志队列中的日志条目""" # while not self.log_queue.empty(): # line = self.log_queue.get() # self.process_log_line(line) # # 仅在GUI仍在运行时继续处理队列 # if self.is_running: # self.after(100, self.process_log_queue) # def process_log_line(self, line): # """解析单行日志并更新日志数据和筛选器""" # match = re.match( # r"""^ # (?:(?P