feat:优化了log查看器

This commit is contained in:
SengokuCola
2025-06-12 20:29:27 +08:00
parent 745d762dbb
commit 4abb077388
2 changed files with 590 additions and 255 deletions

View File

@@ -1,10 +1,192 @@
import tkinter as tk import tkinter as tk
from tkinter import ttk, colorchooser from tkinter import ttk, colorchooser, messagebox, filedialog
import json import json
from pathlib import Path from pathlib import Path
import threading import threading
import queue import queue
import time import time
import toml
from datetime import datetime
class LogFormatter:
"""日志格式化器同步logger.py的格式"""
def __init__(self, config, custom_module_colors=None, custom_level_colors=None):
self.config = config
# 日志级别颜色
self.level_colors = {
"debug": "#FFA500", # 橙色
"info": "#0000FF", # 蓝色
"success": "#008000", # 绿色
"warning": "#FFFF00", # 黄色
"error": "#FF0000", # 红色
"critical": "#800080", # 紫色
}
# 模块颜色映射 - 同步logger.py中的MODULE_COLORS
self.module_colors = {
"api": "#00FF00", # 亮绿色
"emoji": "#00FF00", # 亮绿色
"chat": "#0080FF", # 亮蓝色
"config": "#FFFF00", # 亮黄色
"common": "#FF00FF", # 亮紫色
"tools": "#00FFFF", # 亮青色
"lpmm": "#00FFFF", # 亮青色
"plugin_system": "#FF0080", # 亮红色
"experimental": "#FFFFFF", # 亮白色
"person_info": "#008000", # 绿色
"individuality": "#000080", # 蓝色
"manager": "#800080", # 紫色
"llm_models": "#008080", # 青色
"plugins": "#800000", # 红色
"plugin_api": "#808000", # 黄色
"remote": "#8000FF", # 紫蓝色
}
# 应用自定义颜色
if custom_module_colors:
self.module_colors.update(custom_module_colors)
if custom_level_colors:
self.level_colors.update(custom_level_colors)
# 根据配置决定颜色启用状态
color_text = self.config.get("color_text", "full")
if color_text == "none":
self.enable_colors = False
self.enable_module_colors = False
self.enable_level_colors = False
elif color_text == "title":
self.enable_colors = True
self.enable_module_colors = True
self.enable_level_colors = False
elif color_text == "full":
self.enable_colors = True
self.enable_module_colors = True
self.enable_level_colors = True
else:
self.enable_colors = True
self.enable_module_colors = True
self.enable_level_colors = False
def format_log_entry(self, log_entry):
"""格式化日志条目,返回格式化后的文本和样式标签"""
# 获取基本信息
timestamp = log_entry.get("timestamp", "")
level = log_entry.get("level", "info")
logger_name = log_entry.get("logger_name", "")
event = log_entry.get("event", "")
# 格式化时间戳
formatted_timestamp = self.format_timestamp(timestamp)
# 构建输出部分
parts = []
tags = []
# 日志级别样式配置
log_level_style = self.config.get("log_level_style", "lite")
# 时间戳
if formatted_timestamp:
if log_level_style == "lite" and self.enable_level_colors:
# lite模式下时间戳按级别着色
parts.append(formatted_timestamp)
tags.append(f"level_{level}")
else:
parts.append(formatted_timestamp)
tags.append("timestamp")
# 日志级别显示
if log_level_style == "full":
# 显示完整级别名
level_text = f"[{level.upper():>8}]"
parts.append(level_text)
if self.enable_level_colors:
tags.append(f"level_{level}")
else:
tags.append("level")
elif log_level_style == "compact":
# 只显示首字母
level_text = f"[{level.upper()[0]:>8}]"
parts.append(level_text)
if self.enable_level_colors:
tags.append(f"level_{level}")
else:
tags.append("level")
# lite模式不显示级别
# 模块名称
if logger_name:
module_text = f"[{logger_name}]"
parts.append(module_text)
if self.enable_module_colors:
tags.append(f"module_{logger_name}")
else:
tags.append("module")
# 消息内容
if isinstance(event, str):
parts.append(event)
elif isinstance(event, dict):
try:
parts.append(json.dumps(event, ensure_ascii=False, indent=None))
except (TypeError, ValueError):
parts.append(str(event))
else:
parts.append(str(event))
tags.append("message")
# 处理其他字段
extras = []
for key, value in log_entry.items():
if key not in ("timestamp", "level", "logger_name", "event"):
if isinstance(value, (dict, list)):
try:
value_str = json.dumps(value, ensure_ascii=False, indent=None)
except (TypeError, ValueError):
value_str = str(value)
else:
value_str = str(value)
extras.append(f"{key}={value_str}")
if extras:
parts.append(" ".join(extras))
tags.append("extras")
return parts, tags
def format_timestamp(self, timestamp):
"""格式化时间戳"""
if not timestamp:
return ""
try:
# 尝试解析ISO格式时间戳
if 'T' in timestamp:
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
else:
# 假设已经是格式化的字符串
return timestamp
# 根据配置格式化
date_style = self.config.get("date_style", "m-d H:i:s")
format_map = {
'Y': '%Y', # 4位年份
'm': '%m', # 月份01-12
'd': '%d', # 日期01-31
'H': '%H', # 小时00-23
'i': '%M', # 分钟00-59
's': '%S', # 秒数00-59
}
python_format = date_style
for php_char, python_char in format_map.items():
python_format = python_format.replace(php_char, python_char)
return dt.strftime(python_format)
except Exception:
return timestamp
class LogViewer: class LogViewer:
def __init__(self, root): def __init__(self, root):
@@ -12,10 +194,19 @@ class LogViewer:
self.root.title("MaiBot日志查看器") self.root.title("MaiBot日志查看器")
self.root.geometry("1200x800") self.root.geometry("1200x800")
# 加载配置
self.load_config()
# 初始化日志格式化器
self.formatter = LogFormatter(self.log_config, self.custom_module_colors, self.custom_level_colors)
# 创建主框架 # 创建主框架
self.main_frame = ttk.Frame(root) self.main_frame = ttk.Frame(root)
self.main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 创建菜单栏
self.create_menu()
# 创建控制面板 # 创建控制面板
self.control_frame = ttk.Frame(self.main_frame) self.control_frame = ttk.Frame(self.main_frame)
self.control_frame.pack(fill=tk.X, pady=(0, 5)) self.control_frame.pack(fill=tk.X, pady=(0, 5))
@@ -46,7 +237,7 @@ class LogViewer:
ttk.Label(level_frame, text="级别:").pack(side=tk.LEFT, padx=2) ttk.Label(level_frame, text="级别:").pack(side=tk.LEFT, padx=2)
self.level_var = tk.StringVar(value="全部") self.level_var = tk.StringVar(value="全部")
self.level_combo = ttk.Combobox(level_frame, textvariable=self.level_var, width=8) self.level_combo = ttk.Combobox(level_frame, textvariable=self.level_var, width=8)
self.level_combo['values'] = ['全部', 'info', 'warning', 'error'] self.level_combo['values'] = ['全部', 'debug', 'info', 'warning', 'error', 'critical']
self.level_combo.pack(side=tk.LEFT, padx=2) self.level_combo.pack(side=tk.LEFT, padx=2)
# 搜索框 # 搜索框
@@ -65,17 +256,14 @@ class LogViewer:
self.scrollbar = ttk.Scrollbar(self.log_frame) self.scrollbar = ttk.Scrollbar(self.log_frame)
self.scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.log_text = tk.Text(self.log_frame, wrap=tk.WORD, yscrollcommand=self.scrollbar.set) self.log_text = tk.Text(self.log_frame, wrap=tk.WORD, yscrollcommand=self.scrollbar.set,
background='#1e1e1e', foreground='#ffffff',
insertbackground='#ffffff', selectbackground='#404040')
self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.scrollbar.config(command=self.log_text.yview) self.scrollbar.config(command=self.log_text.yview)
# 设置默认标签颜色 # 配置文本标签样式
self.colors = { self.configure_text_tags()
'info': 'black',
'warning': 'orange',
'error': 'red'
}
self.module_colors = {}
# 模块名映射 # 模块名映射
self.module_name_mapping = { self.module_name_mapping = {
@@ -130,6 +318,335 @@ class LogViewer:
self.update_thread.daemon = True self.update_thread.daemon = True
self.update_thread.start() self.update_thread.start()
def load_config(self):
"""加载配置文件"""
# 默认配置
self.default_config = {
"log": {
"date_style": "m-d H:i:s",
"log_level_style": "lite",
"color_text": "full",
"log_level": "INFO"
},
"viewer": {
"theme": "dark",
"font_size": 10,
"max_lines": 1000,
"auto_scroll": True,
"show_milliseconds": False,
"window": {
"width": 1200,
"height": 800,
"remember_position": True
}
}
}
# 从bot_config.toml加载日志配置
config_path = Path("config/bot_config.toml")
self.log_config = self.default_config["log"].copy()
self.viewer_config = self.default_config["viewer"].copy()
try:
if config_path.exists():
with open(config_path, 'r', encoding='utf-8') as f:
bot_config = toml.load(f)
if 'log' in bot_config:
self.log_config.update(bot_config['log'])
except Exception as e:
print(f"加载bot配置失败: {e}")
# 从viewer配置文件加载查看器配置
viewer_config_path = Path("config/log_viewer_config.toml")
self.custom_module_colors = {}
self.custom_level_colors = {}
try:
if viewer_config_path.exists():
with open(viewer_config_path, 'r', encoding='utf-8') as f:
viewer_config = toml.load(f)
if 'viewer' in viewer_config:
self.viewer_config.update(viewer_config['viewer'])
# 加载自定义模块颜色
if 'module_colors' in viewer_config['viewer']:
self.custom_module_colors = viewer_config['viewer']['module_colors']
# 加载自定义级别颜色
if 'level_colors' in viewer_config['viewer']:
self.custom_level_colors = viewer_config['viewer']['level_colors']
if 'log' in viewer_config:
self.log_config.update(viewer_config['log'])
except Exception as e:
print(f"加载查看器配置失败: {e}")
# 应用窗口配置
window_config = self.viewer_config.get('window', {})
window_width = window_config.get('width', 1200)
window_height = window_config.get('height', 800)
self.root.geometry(f"{window_width}x{window_height}")
def save_viewer_config(self):
"""保存查看器配置"""
# 准备完整的配置数据
viewer_config_copy = self.viewer_config.copy()
# 保存自定义颜色(只保存与默认值不同的颜色)
if self.custom_module_colors:
viewer_config_copy["module_colors"] = self.custom_module_colors
if self.custom_level_colors:
viewer_config_copy["level_colors"] = self.custom_level_colors
config_data = {
"log": self.log_config,
"viewer": viewer_config_copy
}
config_path = Path("config/log_viewer_config.toml")
config_path.parent.mkdir(exist_ok=True)
try:
with open(config_path, 'w', encoding='utf-8') as f:
toml.dump(config_data, f)
except Exception as e:
print(f"保存查看器配置失败: {e}")
def create_menu(self):
"""创建菜单栏"""
menubar = tk.Menu(self.root)
self.root.config(menu=menubar)
# 配置菜单
config_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="配置", menu=config_menu)
config_menu.add_command(label="日志格式设置", command=self.show_format_settings)
config_menu.add_command(label="颜色设置", command=self.show_color_settings)
config_menu.add_command(label="查看器设置", command=self.show_viewer_settings)
config_menu.add_separator()
config_menu.add_command(label="重新加载配置", command=self.reload_config)
# 工具菜单
tools_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="工具", menu=tools_menu)
tools_menu.add_command(label="清空日志显示", command=self.clear_log_display)
tools_menu.add_command(label="导出当前日志", command=self.export_logs)
def show_format_settings(self):
"""显示格式设置窗口"""
format_window = tk.Toplevel(self.root)
format_window.title("日志格式设置")
format_window.geometry("400x300")
frame = ttk.Frame(format_window)
frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 日期格式
ttk.Label(frame, text="日期格式:").pack(anchor='w', pady=2)
date_style_var = tk.StringVar(value=self.log_config.get("date_style", "m-d H:i:s"))
date_entry = ttk.Entry(frame, textvariable=date_style_var, width=30)
date_entry.pack(anchor='w', pady=2)
ttk.Label(frame, text="格式说明: Y=年份, m=月份, d=日期, H=小时, i=分钟, s=秒",
font=('', 8)).pack(anchor='w', pady=2)
# 日志级别样式
ttk.Label(frame, text="日志级别样式:").pack(anchor='w', pady=(10,2))
level_style_var = tk.StringVar(value=self.log_config.get("log_level_style", "lite"))
level_frame = ttk.Frame(frame)
level_frame.pack(anchor='w', pady=2)
ttk.Radiobutton(level_frame, text="简洁(lite)", variable=level_style_var,
value="lite").pack(side='left', padx=(0,10))
ttk.Radiobutton(level_frame, text="紧凑(compact)", variable=level_style_var,
value="compact").pack(side='left', padx=(0,10))
ttk.Radiobutton(level_frame, text="完整(full)", variable=level_style_var,
value="full").pack(side='left', padx=(0,10))
# 颜色文本设置
ttk.Label(frame, text="文本颜色设置:").pack(anchor='w', pady=(10,2))
color_text_var = tk.StringVar(value=self.log_config.get("color_text", "full"))
color_frame = ttk.Frame(frame)
color_frame.pack(anchor='w', pady=2)
ttk.Radiobutton(color_frame, text="无颜色(none)", variable=color_text_var,
value="none").pack(side='left', padx=(0,10))
ttk.Radiobutton(color_frame, text="仅标题(title)", variable=color_text_var,
value="title").pack(side='left', padx=(0,10))
ttk.Radiobutton(color_frame, text="全部(full)", variable=color_text_var,
value="full").pack(side='left', padx=(0,10))
# 按钮
button_frame = ttk.Frame(frame)
button_frame.pack(fill='x', pady=(20,0))
def apply_format():
self.log_config["date_style"] = date_style_var.get()
self.log_config["log_level_style"] = level_style_var.get()
self.log_config["color_text"] = color_text_var.get()
# 重新初始化格式化器
self.formatter = LogFormatter(self.log_config, self.custom_module_colors, self.custom_level_colors)
self.configure_text_tags()
# 保存配置
self.save_viewer_config()
# 重新过滤日志以应用新格式
self.filter_logs()
format_window.destroy()
ttk.Button(button_frame, text="应用", command=apply_format).pack(side='right', padx=(5,0))
ttk.Button(button_frame, text="取消", command=format_window.destroy).pack(side='right')
def show_viewer_settings(self):
"""显示查看器设置窗口"""
viewer_window = tk.Toplevel(self.root)
viewer_window.title("查看器设置")
viewer_window.geometry("350x250")
frame = ttk.Frame(viewer_window)
frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 主题设置
ttk.Label(frame, text="主题:").pack(anchor='w', pady=2)
theme_var = tk.StringVar(value=self.viewer_config.get("theme", "dark"))
theme_frame = ttk.Frame(frame)
theme_frame.pack(anchor='w', pady=2)
ttk.Radiobutton(theme_frame, text="深色", variable=theme_var, value="dark").pack(side='left', padx=(0,10))
ttk.Radiobutton(theme_frame, text="浅色", variable=theme_var, value="light").pack(side='left')
# 字体大小
ttk.Label(frame, text="字体大小:").pack(anchor='w', pady=(10,2))
font_size_var = tk.IntVar(value=self.viewer_config.get("font_size", 10))
font_size_spin = ttk.Spinbox(frame, from_=8, to=20, textvariable=font_size_var, width=10)
font_size_spin.pack(anchor='w', pady=2)
# 最大行数
ttk.Label(frame, text="最大显示行数:").pack(anchor='w', pady=(10,2))
max_lines_var = tk.IntVar(value=self.viewer_config.get("max_lines", 1000))
max_lines_spin = ttk.Spinbox(frame, from_=100, to=10000, increment=100,
textvariable=max_lines_var, width=10)
max_lines_spin.pack(anchor='w', pady=2)
# 自动滚动
auto_scroll_var = tk.BooleanVar(value=self.viewer_config.get("auto_scroll", True))
ttk.Checkbutton(frame, text="自动滚动到底部", variable=auto_scroll_var).pack(anchor='w', pady=(10,2))
# 按钮
button_frame = ttk.Frame(frame)
button_frame.pack(fill='x', pady=(20,0))
def apply_viewer_settings():
self.viewer_config["theme"] = theme_var.get()
self.viewer_config["font_size"] = font_size_var.get()
self.viewer_config["max_lines"] = max_lines_var.get()
self.viewer_config["auto_scroll"] = auto_scroll_var.get()
# 应用主题
self.apply_theme()
# 保存配置
self.save_viewer_config()
viewer_window.destroy()
ttk.Button(button_frame, text="应用", command=apply_viewer_settings).pack(side='right', padx=(5,0))
ttk.Button(button_frame, text="取消", command=viewer_window.destroy).pack(side='right')
def apply_theme(self):
"""应用主题设置"""
theme = self.viewer_config.get("theme", "dark")
font_size = self.viewer_config.get("font_size", 10)
if theme == "dark":
bg_color = '#1e1e1e'
fg_color = '#ffffff'
select_bg = '#404040'
else:
bg_color = '#ffffff'
fg_color = '#000000'
select_bg = '#c0c0c0'
self.log_text.config(
background=bg_color,
foreground=fg_color,
selectbackground=select_bg,
font=('Consolas', font_size)
)
# 重新配置标签样式
self.configure_text_tags()
def configure_text_tags(self):
"""配置文本标签样式"""
# 清除现有标签
for tag in self.log_text.tag_names():
if tag != 'sel':
self.log_text.tag_delete(tag)
# 基础标签
self.log_text.tag_configure("timestamp", foreground="#808080")
self.log_text.tag_configure("level", foreground="#808080")
self.log_text.tag_configure("module", foreground="#808080")
self.log_text.tag_configure("message", foreground=self.log_text.cget("foreground"))
self.log_text.tag_configure("extras", foreground="#808080")
# 日志级别颜色标签
for level, color in self.formatter.level_colors.items():
self.log_text.tag_configure(f"level_{level}", foreground=color)
# 模块颜色标签
for module, color in self.formatter.module_colors.items():
self.log_text.tag_configure(f"module_{module}", foreground=color)
def reload_config(self):
"""重新加载配置"""
self.load_config()
self.formatter = LogFormatter(self.log_config, self.custom_module_colors, self.custom_level_colors)
self.configure_text_tags()
self.apply_theme()
self.filter_logs()
def clear_log_display(self):
"""清空日志显示"""
self.log_text.delete(1.0, tk.END)
def export_logs(self):
"""导出当前显示的日志"""
filename = filedialog.asksaveasfilename(
defaultextension=".txt",
filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")]
)
if filename:
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write(self.log_text.get(1.0, tk.END))
messagebox.showinfo("导出成功", f"日志已导出到: {filename}")
except Exception as e:
messagebox.showerror("导出失败", f"导出日志时出错: {e}")
def load_module_mapping(self):
"""加载自定义模块映射"""
mapping_file = Path("config/module_mapping.json")
if mapping_file.exists():
try:
with open(mapping_file, 'r', encoding='utf-8') as f:
custom_mapping = json.load(f)
self.module_name_mapping.update(custom_mapping)
except Exception as e:
print(f"加载模块映射失败: {e}")
def save_module_mapping(self):
"""保存自定义模块映射"""
mapping_file = Path("config/module_mapping.json")
mapping_file.parent.mkdir(exist_ok=True)
try:
with open(mapping_file, 'w', encoding='utf-8') as f:
json.dump(self.module_name_mapping, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存模块映射失败: {e}")
def show_color_settings(self): def show_color_settings(self):
"""显示颜色设置窗口""" """显示颜色设置窗口"""
color_window = tk.Toplevel(self.root) color_window = tk.Toplevel(self.root)
@@ -163,7 +680,7 @@ class LogViewer:
command=lambda l=level: self.choose_color(l)) command=lambda l=level: self.choose_color(l))
color_btn.pack(side=tk.RIGHT) color_btn.pack(side=tk.RIGHT)
# 显示当前颜色 # 显示当前颜色
color_label = ttk.Label(frame, text="", foreground=self.colors[level]) color_label = ttk.Label(frame, text="", foreground=self.formatter.level_colors[level])
color_label.pack(side=tk.RIGHT, padx=5) color_label.pack(side=tk.RIGHT, padx=5)
# 添加模块颜色设置 # 添加模块颜色设置
@@ -176,7 +693,7 @@ class LogViewer:
command=lambda m=module: self.choose_module_color(m)) command=lambda m=module: self.choose_module_color(m))
color_btn.pack(side=tk.RIGHT) color_btn.pack(side=tk.RIGHT)
# 显示当前颜色 # 显示当前颜色
color = self.module_colors.get(module, 'black') color = self.formatter.module_colors.get(module, 'black')
color_label = ttk.Label(frame, text="", foreground=color) color_label = ttk.Label(frame, text="", foreground=color)
color_label.pack(side=tk.RIGHT, padx=5) color_label.pack(side=tk.RIGHT, padx=5)
@@ -189,37 +706,24 @@ class LogViewer:
def choose_color(self, level): def choose_color(self, level):
"""选择日志级别颜色""" """选择日志级别颜色"""
color = colorchooser.askcolor(color=self.colors[level])[1] color = colorchooser.askcolor(color=self.formatter.level_colors[level])[1]
if color: if color:
self.colors[level] = color self.formatter.level_colors[level] = color
self.log_text.tag_configure(level, foreground=color) self.custom_level_colors[level] = color # 保存到自定义颜色
self.configure_text_tags()
self.save_viewer_config() # 自动保存配置
self.filter_logs() self.filter_logs()
def choose_module_color(self, module): def choose_module_color(self, module):
"""选择模块颜色""" """选择模块颜色"""
color = colorchooser.askcolor(color=self.module_colors.get(module, 'black'))[1] color = colorchooser.askcolor(color=self.formatter.module_colors.get(module, 'black'))[1]
if color: if color:
self.module_colors[module] = color self.formatter.module_colors[module] = color
self.log_text.tag_configure(f"module_{module}", foreground=color) self.custom_module_colors[module] = color # 保存到自定义颜色
# 更新模块列表中的颜色显示 self.configure_text_tags()
self.update_module_color_display(module, color) self.save_viewer_config() # 自动保存配置
self.filter_logs() self.filter_logs()
def update_module_color_display(self, module, color):
"""更新模块列表中的颜色显示"""
# 遍历模块框架中的所有子控件,找到对应模块的颜色标签并更新
for widget in self.module_inner_frame.winfo_children():
if isinstance(widget, ttk.Frame):
# 检查这个框架是否包含目标模块
for child in widget.winfo_children():
if isinstance(child, ttk.Checkbutton):
if child.cget('text') == module:
# 找到了对应的模块,更新其颜色标签
for sibling in widget.winfo_children():
if isinstance(sibling, ttk.Label) and sibling.cget('text') == '':
sibling.config(foreground=color)
return
def update_module_list(self): def update_module_list(self):
"""更新模块列表""" """更新模块列表"""
log_file = Path("logs/app.log.jsonl") log_file = Path("logs/app.log.jsonl")
@@ -259,7 +763,7 @@ class LogViewer:
all_check.pack(side=tk.LEFT) all_check.pack(side=tk.LEFT)
# 使用颜色标签替代按钮 # 使用颜色标签替代按钮
all_color = self.module_colors.get('全部', 'black') all_color = self.formatter.module_colors.get('全部', 'black')
all_color_label = ttk.Label(all_frame, text="", foreground=all_color, width=2, cursor="hand2") all_color_label = ttk.Label(all_frame, text="", foreground=all_color, width=2, cursor="hand2")
all_color_label.pack(side=tk.LEFT, padx=2) all_color_label.pack(side=tk.LEFT, padx=2)
all_color_label.bind('<Button-1>', lambda e: self.choose_module_color('全部')) all_color_label.bind('<Button-1>', lambda e: self.choose_module_color('全部'))
@@ -293,7 +797,7 @@ class LogViewer:
self.create_tooltip(check, full_tooltip) self.create_tooltip(check, full_tooltip)
# 使用颜色标签替代按钮 # 使用颜色标签替代按钮
color = self.module_colors.get(module, 'black') color = self.formatter.module_colors.get(module, 'black')
color_label = ttk.Label(frame, text="", foreground=color, width=2, cursor="hand2") color_label = ttk.Label(frame, text="", foreground=color, width=2, cursor="hand2")
color_label.pack(side=tk.LEFT, padx=2) color_label.pack(side=tk.LEFT, padx=2)
color_label.bind('<Button-1>', lambda e, m=module: self.choose_module_color(m)) color_label.bind('<Button-1>', lambda e, m=module: self.choose_module_color(m))
@@ -395,23 +899,53 @@ class LogViewer:
if not self.should_show_log(log_entry): if not self.should_show_log(log_entry):
return return
# 格式化日志 # 使用格式化器格式化日志
timestamp = log_entry.get('timestamp', '') parts, tags = self.formatter.format_log_entry(log_entry)
level = log_entry.get('level', 'info')
logger_name = log_entry.get('logger_name', '')
event = log_entry.get('event', '')
log_line = f"{timestamp} [{level}] {logger_name}: {event}\n"
# 在主线程中更新UI # 在主线程中更新UI
self.root.after(0, lambda: self.add_log_line(log_line, level, logger_name)) self.root.after(0, lambda: self.add_formatted_log_line(parts, tags, log_entry))
def add_log_line(self, line, level, logger_name): def add_formatted_log_line(self, parts, tags, log_entry):
"""添加日志行到文本框""" """添加格式化的日志行到文本框"""
self.log_text.insert(tk.END, line, (level, f"module_{logger_name}")) # 控制最大行数
# 只有在用户没有手动滚动时才自动滚动到底部 max_lines = self.viewer_config.get("max_lines", 1000)
if self.log_text.yview()[1] >= 0.99: current_lines = int(self.log_text.index('end-1c').split('.')[0])
self.log_text.see(tk.END)
if current_lines > max_lines:
# 删除前面的行
lines_to_delete = current_lines - max_lines + 100 # 一次删除多一些,减少频繁操作
self.log_text.delete(1.0, f"{lines_to_delete}.0")
# 插入格式化的文本
for i, part in enumerate(parts):
if i < len(tags):
tag = tags[i]
# 根据内容类型选择合适的标签
if tag.startswith("level_"):
if self.formatter.enable_level_colors:
self.log_text.insert(tk.END, part, tag)
else:
self.log_text.insert(tk.END, part, "level")
elif tag.startswith("module_"):
if self.formatter.enable_module_colors:
self.log_text.insert(tk.END, part, tag)
else:
self.log_text.insert(tk.END, part, "module")
else:
self.log_text.insert(tk.END, part, tag)
else:
self.log_text.insert(tk.END, part)
# 在部分之间添加空格(除了最后一个)
if i < len(parts) - 1:
self.log_text.insert(tk.END, " ")
self.log_text.insert(tk.END, "\n")
# 自动滚动
if self.viewer_config.get("auto_scroll", True):
if self.log_text.yview()[1] >= 0.99:
self.log_text.see(tk.END)
def should_show_log(self, log_entry): def should_show_log(self, log_entry):
"""检查日志是否应该显示""" """检查日志是否应该显示"""
@@ -447,41 +981,17 @@ class LogViewer:
# 重新显示所有符合条件的日志 # 重新显示所有符合条件的日志
for log_entry in self.log_cache: for log_entry in self.log_cache:
if self.should_show_log(log_entry): if self.should_show_log(log_entry):
timestamp = log_entry.get('timestamp', '') parts, tags = self.formatter.format_log_entry(log_entry)
level = log_entry.get('level', 'info') self.add_formatted_log_line(parts, tags, log_entry)
logger_name = log_entry.get('logger_name', '')
event = log_entry.get('event', '')
log_line = f"{timestamp} [{level}] {logger_name}: {event}\n"
self.log_text.insert(tk.END, log_line, (level, f"module_{logger_name}"))
# 恢复滚动位置 # 恢复滚动位置(如果不是自动滚动模式)
self.log_text.yview_moveto(scroll_position[0]) if not self.viewer_config.get("auto_scroll", True):
self.log_text.yview_moveto(scroll_position[0])
def get_display_name(self, module_name): def get_display_name(self, module_name):
"""获取模块的显示名称""" """获取模块的显示名称"""
return self.module_name_mapping.get(module_name, module_name) return self.module_name_mapping.get(module_name, module_name)
def load_module_mapping(self):
"""加载自定义模块映射"""
mapping_file = Path("config/module_mapping.json")
if mapping_file.exists():
try:
with open(mapping_file, 'r', encoding='utf-8') as f:
custom_mapping = json.load(f)
self.module_name_mapping.update(custom_mapping)
except Exception as e:
print(f"加载模块映射失败: {e}")
def save_module_mapping(self):
"""保存自定义模块映射"""
mapping_file = Path("config/module_mapping.json")
mapping_file.parent.mkdir(exist_ok=True)
try:
with open(mapping_file, 'w', encoding='utf-8') as f:
json.dump(self.module_name_mapping, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存模块映射失败: {e}")
def edit_module_mapping(self): def edit_module_mapping(self):
"""编辑模块映射""" """编辑模块映射"""
mapping_window = tk.Toplevel(self.root) mapping_window = tk.Toplevel(self.root)

View File

@@ -1,175 +0,0 @@
from src.manager.async_task_manager import AsyncTask
from src.common.logger import get_logger
from src.person_info.person_info import PersonInfoManager
from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp
from src.config.config import global_config
from src.chat.message_receive.chat_stream import get_chat_manager
import time
import random
from collections import defaultdict
from src.person_info.relationship_manager import get_relationship_manager
logger = get_logger("relation")
# 暂时弃用,改为实时更新
class ImpressionUpdateTask(AsyncTask):
def __init__(self):
super().__init__(
task_name="impression_update",
wait_before_start=60,
run_interval=global_config.relationship.build_relationship_interval,
)
async def run(self):
try:
# 获取最近的消息
current_time = int(time.time())
start_time = current_time - global_config.relationship.build_relationship_interval # 100分钟前
# 获取所有消息
messages = get_raw_msg_by_timestamp(timestamp_start=start_time, timestamp_end=current_time)
if not messages:
logger.info("没有找到需要处理的消息")
return
logger.info(f"获取到 {len(messages)} 条消息")
# 按chat_id分组消息
chat_messages = defaultdict(list)
for msg in messages:
chat_messages[msg["chat_id"]].append(msg)
logger.info(f"消息按聊天分组: {len(chat_messages)} 个聊天组")
# 处理每个聊天组
for chat_id, msgs in chat_messages.items():
# 获取chat_stream
if len(msgs) < 30:
logger.info(f"聊天组 {chat_id} 消息数小于30跳过处理")
continue
chat_stream = get_chat_manager().get_stream(chat_id)
if not chat_stream:
logger.warning(f"未找到聊天组 {chat_id} 的chat_stream跳过处理")
continue
# 找到bot的消息
bot_messages = [msg for msg in msgs if msg["user_nickname"] == global_config.bot.nickname]
if not bot_messages:
logger.info(f"聊天组 {chat_id} 没有bot消息跳过处理")
continue
# 按时间排序所有消息
sorted_messages = sorted(msgs, key=lambda x: x["time"])
# 找到第一条和最后一条bot消息
first_bot_msg = bot_messages[0]
last_bot_msg = bot_messages[-1]
# 获取第一条bot消息前15条消息
first_bot_index = sorted_messages.index(first_bot_msg)
start_index = max(0, first_bot_index - 25)
# 获取最后一条bot消息后15条消息
last_bot_index = sorted_messages.index(last_bot_msg)
end_index = min(len(sorted_messages), last_bot_index + 26)
# 获取相关消息
relevant_messages = sorted_messages[start_index:end_index]
# 统计用户发言权重
user_weights = defaultdict(lambda: {"weight": 0, "messages": []})
# 计算权重
for bot_msg in bot_messages:
bot_time = bot_msg["time"]
context_messages = [
msg for msg in relevant_messages if abs(msg["time"] - bot_time) <= 600
] # 前后10分钟
logger.debug(f"Bot消息 {bot_time} 的上下文消息数: {len(context_messages)}")
for msg in context_messages:
if msg["user_nickname"] == global_config.bot.nickname:
continue
person_id = PersonInfoManager.get_person_id(msg["chat_info_platform"], msg["user_id"])
if not person_id:
logger.warning(f"未找到用户 {msg['user_nickname']} 的person_id")
continue
# 在bot消息附近的发言权重加倍
if abs(msg["time"] - bot_time) <= 120: # 前后2分钟
user_weights[person_id]["weight"] += 2
logger.debug(f"用户 {msg['user_nickname']} 在bot消息附近发言权重+2")
else:
user_weights[person_id]["weight"] += 1
logger.debug(f"用户 {msg['user_nickname']} 发言,权重+1")
user_weights[person_id]["messages"].append(msg)
# 按权重排序
sorted_users = sorted(user_weights.items(), key=lambda x: x[1]["weight"], reverse=True)
logger.debug(
f"用户权重排序: {[(msg[1]['messages'][0]['user_nickname'], msg[1]['weight']) for msg in sorted_users]}"
)
# 选择最多5个用户
selected_users = []
if len(sorted_users) > 5:
# 使用权重作为概率进行随机选择,确保不重复
weights = [user[1]["weight"] for user in sorted_users]
total_weight = sum(weights)
# 计算每个用户的概率
probabilities = [w / total_weight for w in weights]
# 使用累积概率进行选择
selected_indices = []
remaining_indices = list(range(len(sorted_users)))
for _ in range(5):
if not remaining_indices:
break
# 计算剩余索引的累积概率
remaining_probs = [probabilities[i] for i in remaining_indices]
# 归一化概率
remaining_probs = [p / sum(remaining_probs) for p in remaining_probs]
# 选择索引
chosen_idx = random.choices(remaining_indices, weights=remaining_probs, k=1)[0]
selected_indices.append(chosen_idx)
remaining_indices.remove(chosen_idx)
selected_users = [sorted_users[i] for i in selected_indices]
logger.info(
f"开始进一步了解这些用户: {[msg[1]['messages'][0]['user_nickname'] for msg in selected_users]}"
)
else:
selected_users = sorted_users
logger.info(
f"开始进一步了解用户: {[msg[1]['messages'][0]['user_nickname'] for msg in selected_users]}"
)
relationship_manager = get_relationship_manager()
# 更新选中用户的印象
for person_id, data in selected_users:
user_nickname = data["messages"][0]["user_nickname"]
platform = data["messages"][0]["chat_info_platform"]
user_id = data["messages"][0]["user_id"]
cardname = data["messages"][0]["user_cardname"]
is_known = await relationship_manager.is_known_some_one(platform, user_id)
if not is_known:
logger.info(f"首次认识用户: {user_nickname}")
await relationship_manager.first_knowing_some_one(platform, user_id, user_nickname, cardname)
logger.info(f"开始更新用户 {user_nickname} 的印象")
await relationship_manager.update_person_impression(
person_id=person_id, timestamp=last_bot_msg["time"], bot_engaged_messages=relevant_messages
)
logger.debug("印象更新任务执行完成")
except Exception as e:
logger.exception(f"更新印象任务失败: {str(e)}")