🤖 自动格式化代码 [skip ci]

This commit is contained in:
github-actions[bot]
2025-06-16 14:10:49 +00:00
parent f6215cd560
commit ee005456ea
16 changed files with 461 additions and 383 deletions

View File

@@ -1,62 +1,59 @@
import tkinter as tk
from tkinter import ttk, colorchooser, messagebox, filedialog
from tkinter import ttk, messagebox, filedialog
import json
from pathlib import Path
import threading
import queue
import time
import toml
from datetime import datetime
import bisect
from collections import defaultdict
import os
class LogIndex:
"""日志索引,用于快速检索和过滤"""
def __init__(self):
self.entries = [] # 所有日志条目
self.module_index = defaultdict(list) # 按模块索引
self.level_index = defaultdict(list) # 按级别索引
self.filtered_indices = [] # 当前过滤结果的索引
self.total_entries = 0
def add_entry(self, index, entry):
"""添加日志条目到索引"""
if index >= len(self.entries):
self.entries.extend([None] * (index - len(self.entries) + 1))
self.entries[index] = entry
self.total_entries = max(self.total_entries, index + 1)
# 更新各种索引
logger_name = entry.get("logger_name", "")
level = entry.get("level", "")
self.module_index[logger_name].append(index)
self.level_index[level].append(index)
def filter_entries(self, modules=None, level=None, search_text=None):
"""根据条件过滤日志条目"""
if not modules and not level and not search_text:
self.filtered_indices = list(range(self.total_entries))
return self.filtered_indices
candidate_indices = set(range(self.total_entries))
# 模块过滤
if modules and "全部" not in modules:
module_indices = set()
for module in modules:
module_indices.update(self.module_index.get(module, []))
candidate_indices &= module_indices
# 级别过滤
if level and level != "全部":
level_indices = set(self.level_index.get(level, []))
candidate_indices &= level_indices
# 文本搜索过滤
if search_text:
search_text = search_text.lower()
@@ -68,14 +65,14 @@ class LogIndex:
if search_text in text_content:
text_indices.add(i)
candidate_indices &= text_indices
self.filtered_indices = sorted(list(candidate_indices))
return self.filtered_indices
def get_filtered_count(self):
"""获取过滤后的条目数量"""
return len(self.filtered_indices)
def get_entry_at_filtered_position(self, position):
"""获取过滤结果中指定位置的条目"""
if 0 <= position < len(self.filtered_indices):
@@ -242,16 +239,16 @@ class LogFormatter:
class VirtualLogDisplay:
"""虚拟滚动日志显示组件"""
def __init__(self, parent, formatter):
self.parent = parent
self.formatter = formatter
self.line_height = 20 # 每行高度(像素)
self.visible_lines = 30 # 可见行数
# 创建主框架
self.main_frame = ttk.Frame(parent)
# 创建文本框和滚动条
self.scrollbar = ttk.Scrollbar(self.main_frame)
self.scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
@@ -264,24 +261,24 @@ class VirtualLogDisplay:
foreground="#ffffff",
insertbackground="#ffffff",
selectbackground="#404040",
font=("Consolas", 10)
font=("Consolas", 10),
)
self.text_widget.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.scrollbar.config(command=self.text_widget.yview)
# 配置文本标签样式
self.configure_text_tags()
# 数据源
self.log_index = None
self.current_page = 0
self.page_size = 500 # 每页显示条数
self.max_display_lines = 2000 # 最大显示行数
def pack(self, **kwargs):
"""包装pack方法"""
self.main_frame.pack(**kwargs)
def configure_text_tags(self):
"""配置文本标签样式"""
# 基础标签
@@ -289,78 +286,84 @@ class VirtualLogDisplay:
self.text_widget.tag_configure("level", foreground="#808080")
self.text_widget.tag_configure("module", foreground="#808080")
self.text_widget.tag_configure("message", foreground="#ffffff")
# 日志级别颜色标签
for level, color in self.formatter.level_colors.items():
self.text_widget.tag_configure(f"level_{level}", foreground=color)
# 模块颜色标签
for module, color in self.formatter.module_colors.items():
self.text_widget.tag_configure(f"module_{module}", foreground=color)
def set_log_index(self, log_index):
"""设置日志索引数据源"""
self.log_index = log_index
self.current_page = 0
self.refresh_display()
def refresh_display(self):
"""刷新显示"""
if not self.log_index:
self.text_widget.delete(1.0, tk.END)
return
# 清空显示
self.text_widget.delete(1.0, tk.END)
# 批量加载和显示日志
total_count = self.log_index.get_filtered_count()
if total_count == 0:
self.text_widget.insert(tk.END, "没有符合条件的日志记录\n")
return
# 计算显示范围
start_index = 0
end_index = min(total_count, self.max_display_lines)
# 批量处理和显示
batch_size = 100
for batch_start in range(start_index, end_index, batch_size):
batch_end = min(batch_start + batch_size, end_index)
self.display_batch(batch_start, batch_end)
# 让UI有机会响应
self.parent.update_idletasks()
# 滚动到底部(如果需要)
self.text_widget.see(tk.END)
def display_batch(self, start_index, end_index):
"""批量显示日志条目"""
batch_text = []
batch_tags = []
for i in range(start_index, end_index):
log_entry = self.log_index.get_entry_at_filtered_position(i)
if log_entry:
parts, tags = self.formatter.format_log_entry(log_entry)
# 合并部分为单行文本
line_text = " ".join(parts) + "\n"
batch_text.append(line_text)
# 记录标签信息(简化处理)
if tags and self.formatter.enable_level_colors:
level = log_entry.get("level", "info")
batch_tags.append(("line", len("".join(batch_text)) - len(line_text),
len("".join(batch_text)) - 1, f"level_{level}"))
batch_tags.append(
(
"line",
len("".join(batch_text)) - len(line_text),
len("".join(batch_text)) - 1,
f"level_{level}",
)
)
# 一次性插入所有文本
if batch_text:
start_pos = self.text_widget.index(tk.END)
all_text = "".join(batch_text)
self.text_widget.insert(tk.END, all_text)
# 应用标签(可选,为了性能可以考虑简化)
for tag_info in batch_tags:
try:
@@ -372,35 +375,35 @@ class VirtualLogDisplay:
class AsyncLogLoader:
"""异步日志加载器"""
def __init__(self, callback):
self.callback = callback
self.loading = False
self.should_stop = False
def load_file_async(self, file_path, progress_callback=None):
"""异步加载日志文件"""
if self.loading:
return
self.loading = True
self.should_stop = False
def load_worker():
try:
log_index = LogIndex()
if not os.path.exists(file_path):
self.callback(log_index, "文件不存在")
return
file_size = os.path.getsize(file_path)
processed_size = 0
with open(file_path, "r", encoding="utf-8") as f:
line_count = 0
batch_size = 1000 # 批量处理
while not self.should_stop:
lines = []
for _ in range(batch_size):
@@ -408,11 +411,11 @@ class AsyncLogLoader:
if not line:
break
lines.append(line)
processed_size += len(line.encode('utf-8'))
processed_size += len(line.encode("utf-8"))
if not lines:
break
# 处理这批数据
for line in lines:
try:
@@ -421,24 +424,24 @@ class AsyncLogLoader:
line_count += 1
except json.JSONDecodeError:
continue
# 更新进度
if progress_callback:
progress = min(100, (processed_size / file_size) * 100)
progress_callback(progress, line_count)
if not self.should_stop:
self.callback(log_index, None)
except Exception as e:
self.callback(None, str(e))
finally:
self.loading = False
thread = threading.Thread(target=load_worker)
thread.daemon = True
thread.start()
def stop_loading(self):
"""停止加载"""
self.should_stop = True
@@ -462,7 +465,7 @@ class LogViewer:
# 初始化异步加载器
self.async_loader = AsyncLogLoader(self.on_file_loaded)
# 初始化日志索引
self.log_index = LogIndex()
@@ -472,7 +475,7 @@ class LogViewer:
# 创建控制面板
self.create_control_panel()
# 创建虚拟滚动日志显示区域
self.log_display = VirtualLogDisplay(self.main_frame, self.formatter)
self.log_display.pack(fill=tk.BOTH, expand=True)
@@ -503,9 +506,9 @@ class LogViewer:
self.default_config = {
"log": {"date_style": "m-d H:i:s", "log_level_style": "lite", "color_text": "full"},
}
self.log_config = self.default_config["log"].copy()
config_path = Path("config/bot_config.toml")
try:
if config_path.exists():
@@ -576,20 +579,20 @@ class LogViewer:
def on_file_loaded(self, log_index, error):
"""文件加载完成回调"""
self.progress_bar.pack_forget()
if error:
self.status_var.set(f"加载失败: {error}")
messagebox.showerror("错误", f"加载日志文件失败: {error}")
return
self.log_index = log_index
self.status_var.set(f"已加载 {log_index.total_entries} 条日志")
# 更新模块列表
self.modules = set(log_index.module_index.keys())
module_values = ["全部"] + sorted(list(self.modules))
self.module_combo["values"] = module_values
# 应用过滤并显示
self.filter_logs()
@@ -607,22 +610,19 @@ class LogViewer:
if not self.current_log_file.exists():
self.status_var.set("文件不存在")
return
# 显示进度条
self.progress_bar.pack(side=tk.LEFT, padx=5, pady=2, before=self.status_label)
self.progress_var.set(0)
self.status_var.set("正在加载...")
# 清空当前数据
self.log_index = LogIndex()
self.modules.clear()
self.selected_modules.clear()
# 开始异步加载
self.async_loader.load_file_async(
str(self.current_log_file),
self.on_loading_progress
)
self.async_loader.load_file_async(str(self.current_log_file), self.on_loading_progress)
def on_module_selected(self, event=None):
"""模块选择事件"""
@@ -637,18 +637,18 @@ class LogViewer:
"""过滤日志"""
if not self.log_index:
return
# 获取过滤条件
selected_modules = self.selected_modules if self.selected_modules else None
level = self.level_var.get() if self.level_var.get() != "全部" else None
search_text = self.search_var.get().strip() if self.search_var.get().strip() else None
# 应用过滤
self.log_index.filter_entries(selected_modules, level, search_text)
# 更新显示
self.log_display.set_log_index(self.log_index)
# 更新状态
filtered_count = self.log_index.get_filtered_count()
total_count = self.log_index.total_entries
@@ -683,4 +683,4 @@ def main():
if __name__ == "__main__":
main()
main()