diff --git a/scripts/preview_expressions.py b/scripts/preview_expressions.py new file mode 100644 index 000000000..0eebfb442 --- /dev/null +++ b/scripts/preview_expressions.py @@ -0,0 +1,265 @@ +import tkinter as tk +from tkinter import ttk +import json +import os +from pathlib import Path +import networkx as nx +import matplotlib.pyplot as plt +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg +from sklearn.feature_extraction.text import TfidfVectorizer +from sklearn.metrics.pairwise import cosine_similarity +import numpy as np +from collections import defaultdict + +class ExpressionViewer: + def __init__(self, root): + self.root = root + self.root.title("表达方式预览器") + self.root.geometry("1200x800") + + # 创建主框架 + self.main_frame = ttk.Frame(root) + self.main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) + + # 创建左侧控制面板 + self.control_frame = ttk.Frame(self.main_frame) + self.control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 10)) + + # 创建搜索框 + self.search_frame = ttk.Frame(self.control_frame) + self.search_frame.pack(fill=tk.X, pady=(0, 10)) + + self.search_var = tk.StringVar() + self.search_var.trace('w', self.filter_expressions) + self.search_entry = ttk.Entry(self.search_frame, textvariable=self.search_var) + self.search_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) + ttk.Label(self.search_frame, text="搜索:").pack(side=tk.LEFT, padx=(0, 5)) + + # 创建文件选择下拉框 + self.file_var = tk.StringVar() + self.file_combo = ttk.Combobox(self.search_frame, textvariable=self.file_var) + self.file_combo.pack(side=tk.LEFT, padx=5) + self.file_combo.bind('<>', self.load_file) + + # 创建排序选项 + self.sort_frame = ttk.LabelFrame(self.control_frame, text="排序选项") + self.sort_frame.pack(fill=tk.X, pady=5) + + self.sort_var = tk.StringVar(value="count") + ttk.Radiobutton(self.sort_frame, text="按计数排序", variable=self.sort_var, + value="count", command=self.apply_sort).pack(anchor=tk.W) + ttk.Radiobutton(self.sort_frame, text="按情境排序", variable=self.sort_var, + value="situation", command=self.apply_sort).pack(anchor=tk.W) + ttk.Radiobutton(self.sort_frame, text="按风格排序", variable=self.sort_var, + value="style", command=self.apply_sort).pack(anchor=tk.W) + + # 创建分群选项 + self.group_frame = ttk.LabelFrame(self.control_frame, text="分群选项") + self.group_frame.pack(fill=tk.X, pady=5) + + self.group_var = tk.StringVar(value="none") + ttk.Radiobutton(self.group_frame, text="不分群", variable=self.group_var, + value="none", command=self.apply_grouping).pack(anchor=tk.W) + ttk.Radiobutton(self.group_frame, text="按情境分群", variable=self.group_var, + value="situation", command=self.apply_grouping).pack(anchor=tk.W) + ttk.Radiobutton(self.group_frame, text="按风格分群", variable=self.group_var, + value="style", command=self.apply_grouping).pack(anchor=tk.W) + + # 创建相似度阈值滑块 + self.similarity_frame = ttk.LabelFrame(self.control_frame, text="相似度设置") + self.similarity_frame.pack(fill=tk.X, pady=5) + + self.similarity_var = tk.DoubleVar(value=0.5) + self.similarity_scale = ttk.Scale(self.similarity_frame, from_=0.0, to=1.0, + variable=self.similarity_var, orient=tk.HORIZONTAL, + command=self.update_similarity) + self.similarity_scale.pack(fill=tk.X, padx=5, pady=5) + ttk.Label(self.similarity_frame, text="相似度阈值: 0.5").pack() + + # 创建显示选项 + self.view_frame = ttk.LabelFrame(self.control_frame, text="显示选项") + self.view_frame.pack(fill=tk.X, pady=5) + + self.show_graph_var = tk.BooleanVar(value=True) + ttk.Checkbutton(self.view_frame, text="显示关系图", variable=self.show_graph_var, + command=self.toggle_graph).pack(anchor=tk.W) + + # 创建右侧内容区域 + self.content_frame = ttk.Frame(self.main_frame) + self.content_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + + # 创建文本显示区域 + self.text_area = tk.Text(self.content_frame, wrap=tk.WORD) + self.text_area.pack(side=tk.TOP, fill=tk.BOTH, expand=True) + + # 添加滚动条 + scrollbar = ttk.Scrollbar(self.text_area, command=self.text_area.yview) + scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + self.text_area.config(yscrollcommand=scrollbar.set) + + # 创建图形显示区域 + self.graph_frame = ttk.Frame(self.content_frame) + self.graph_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True) + + # 初始化数据 + self.current_data = [] + self.graph = nx.Graph() + self.canvas = None + + # 加载文件列表 + self.load_file_list() + + def load_file_list(self): + expression_dir = Path("data/expression") + files = [] + for root, _, filenames in os.walk(expression_dir): + for filename in filenames: + if filename.endswith('.json'): + rel_path = os.path.relpath(os.path.join(root, filename), expression_dir) + files.append(rel_path) + + self.file_combo['values'] = files + if files: + self.file_combo.set(files[0]) + self.load_file(None) + + def load_file(self, event): + selected_file = self.file_var.get() + if not selected_file: + return + + file_path = os.path.join("data/expression", selected_file) + try: + with open(file_path, 'r', encoding='utf-8') as f: + self.current_data = json.load(f) + + self.apply_sort() + self.update_similarity() + + except Exception as e: + self.text_area.delete(1.0, tk.END) + self.text_area.insert(tk.END, f"加载文件时出错: {str(e)}") + + def apply_sort(self): + if not self.current_data: + return + + sort_key = self.sort_var.get() + reverse = sort_key == "count" + + self.current_data.sort(key=lambda x: x.get(sort_key, ""), reverse=reverse) + self.apply_grouping() + + def apply_grouping(self): + if not self.current_data: + return + + group_key = self.group_var.get() + if group_key == "none": + self.display_data(self.current_data) + return + + grouped_data = defaultdict(list) + for item in self.current_data: + key = item.get(group_key, "未分类") + grouped_data[key].append(item) + + self.text_area.delete(1.0, tk.END) + for group, items in grouped_data.items(): + self.text_area.insert(tk.END, f"\n=== {group} ===\n\n") + for item in items: + self.text_area.insert(tk.END, f"情境: {item.get('situation', 'N/A')}\n") + self.text_area.insert(tk.END, f"风格: {item.get('style', 'N/A')}\n") + self.text_area.insert(tk.END, f"计数: {item.get('count', 'N/A')}\n") + self.text_area.insert(tk.END, "-" * 50 + "\n") + + def display_data(self, data): + self.text_area.delete(1.0, tk.END) + for item in data: + self.text_area.insert(tk.END, f"情境: {item.get('situation', 'N/A')}\n") + self.text_area.insert(tk.END, f"风格: {item.get('style', 'N/A')}\n") + self.text_area.insert(tk.END, f"计数: {item.get('count', 'N/A')}\n") + self.text_area.insert(tk.END, "-" * 50 + "\n") + + def update_similarity(self, *args): + if not self.current_data: + return + + threshold = self.similarity_var.get() + self.similarity_frame.winfo_children()[-1].config(text=f"相似度阈值: {threshold:.2f}") + + # 计算相似度 + texts = [f"{item['situation']} {item['style']}" for item in self.current_data] + vectorizer = TfidfVectorizer() + tfidf_matrix = vectorizer.fit_transform(texts) + similarity_matrix = cosine_similarity(tfidf_matrix) + + # 创建图 + self.graph.clear() + for i, item in enumerate(self.current_data): + self.graph.add_node(i, label=f"{item['situation']}\n{item['style']}") + + # 添加边 + for i in range(len(self.current_data)): + for j in range(i + 1, len(self.current_data)): + if similarity_matrix[i, j] > threshold: + self.graph.add_edge(i, j, weight=similarity_matrix[i, j]) + + if self.show_graph_var.get(): + self.draw_graph() + + def draw_graph(self): + if self.canvas: + self.canvas.get_tk_widget().destroy() + + fig = plt.figure(figsize=(8, 6)) + pos = nx.spring_layout(self.graph) + + # 绘制节点 + nx.draw_networkx_nodes(self.graph, pos, node_color='lightblue', + node_size=1000, alpha=0.6) + + # 绘制边 + nx.draw_networkx_edges(self.graph, pos, alpha=0.4) + + # 添加标签 + labels = nx.get_node_attributes(self.graph, 'label') + nx.draw_networkx_labels(self.graph, pos, labels, font_size=8) + + plt.title("表达方式关系图") + plt.axis('off') + + self.canvas = FigureCanvasTkAgg(fig, master=self.graph_frame) + self.canvas.draw() + self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) + + def toggle_graph(self): + if self.show_graph_var.get(): + self.draw_graph() + else: + if self.canvas: + self.canvas.get_tk_widget().destroy() + self.canvas = None + + def filter_expressions(self, *args): + search_text = self.search_var.get().lower() + if not search_text: + self.apply_sort() + return + + filtered_data = [] + for item in self.current_data: + situation = item.get('situation', '').lower() + style = item.get('style', '').lower() + if search_text in situation or search_text in style: + filtered_data.append(item) + + self.display_data(filtered_data) + +def main(): + root = tk.Tk() + app = ExpressionViewer(root) + root.mainloop() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/common/remote.py b/src/common/remote.py index 064a07cb0..b61a43d8c 100644 --- a/src/common/remote.py +++ b/src/common/remote.py @@ -124,9 +124,7 @@ class TelemetryHeartBeatTask(AsyncTask): timeout=5, # 设置超时时间为5秒 ) except Exception as e: - # 你知道为什么设置成debug吗? - # 因为我不想看到 - logger.debug(f"心跳发送失败: {e}") + logger.warning(f"(此错误不会影响正常使用)状态未发生: {e}") logger.debug(response) @@ -136,21 +134,21 @@ class TelemetryHeartBeatTask(AsyncTask): logger.debug(f"心跳发送成功,状态码: {response.status_code}") elif response.status_code == 403: # 403 Forbidden - logger.error( - "心跳发送失败,403 Forbidden: 可能是UUID无效或未注册。" + logger.warning( + "(此错误不会影响正常使用)心跳发送失败,403 Forbidden: 可能是UUID无效或未注册。" "处理措施:重置UUID,下次发送心跳时将尝试重新注册。" ) self.client_uuid = None del local_storage["mmc_uuid"] # 删除本地存储的UUID else: # 其他错误 - logger.error(f"心跳发送失败,状态码: {response.status_code}, 响应内容: {response.text}") + logger.warning(f"(此错误不会影响正常使用)状态未发送,状态码: {response.status_code}, 响应内容: {response.text}") async def run(self): # 发送心跳 if global_config.telemetry.enable: if self.client_uuid is None and not await self._req_uuid(): - logger.error("获取UUID失败,跳过此次心跳") + logger.warning("获取UUID失败,跳过此次心跳") return await self._send_heartbeat() diff --git a/src/individuality/expression_style.py b/src/individuality/expression_style.py index 29b687076..0d650ce46 100644 --- a/src/individuality/expression_style.py +++ b/src/individuality/expression_style.py @@ -6,6 +6,7 @@ from src.chat.utils.prompt_builder import Prompt, global_prompt_manager from typing import List, Tuple import os import json +from datetime import datetime logger = get_logger("expressor") @@ -45,11 +46,30 @@ class PersonalityExpression: if os.path.exists(self.meta_file_path): try: with open(self.meta_file_path, "r", encoding="utf-8") as f: - return json.load(f) + meta_data = json.load(f) + # 检查是否有last_update_time字段 + if "last_update_time" not in meta_data: + logger.warning(f"{self.meta_file_path} 中缺少last_update_time字段,将重新开始。") + # 清空并重写元数据文件 + self._write_meta_data({"last_style_text": None, "count": 0, "last_update_time": None}) + # 清空并重写表达文件 + if os.path.exists(self.expressions_file_path): + with open(self.expressions_file_path, "w", encoding="utf-8") as f: + json.dump([], f, ensure_ascii=False, indent=2) + logger.debug(f"已清空表达文件: {self.expressions_file_path}") + return {"last_style_text": None, "count": 0, "last_update_time": None} + return meta_data except json.JSONDecodeError: logger.warning(f"无法解析 {self.meta_file_path} 中的JSON数据,将重新开始。") - return {"last_style_text": None, "count": 0} - return {"last_style_text": None, "count": 0} + # 清空并重写元数据文件 + self._write_meta_data({"last_style_text": None, "count": 0, "last_update_time": None}) + # 清空并重写表达文件 + if os.path.exists(self.expressions_file_path): + with open(self.expressions_file_path, "w", encoding="utf-8") as f: + json.dump([], f, ensure_ascii=False, indent=2) + logger.debug(f"已清空表达文件: {self.expressions_file_path}") + return {"last_style_text": None, "count": 0, "last_update_time": None} + return {"last_style_text": None, "count": 0, "last_update_time": None} def _write_meta_data(self, data): os.makedirs(os.path.dirname(self.meta_file_path), exist_ok=True) @@ -84,7 +104,7 @@ class PersonalityExpression: if count >= self.max_calculations: logger.debug(f"对于风格 '{current_style_text}' 已达到最大计算次数 ({self.max_calculations})。跳过提取。") # 即使跳过,也更新元数据以反映当前风格已被识别且计数已满 - self._write_meta_data({"last_style_text": current_style_text, "count": count}) + self._write_meta_data({"last_style_text": current_style_text, "count": count, "last_update_time": meta_data.get("last_update_time")}) return # 构建prompt @@ -99,30 +119,63 @@ class PersonalityExpression: except Exception as e: logger.error(f"个性表达方式提取失败: {e}") # 如果提取失败,保存当前的风格和未增加的计数 - self._write_meta_data({"last_style_text": current_style_text, "count": count}) + self._write_meta_data({"last_style_text": current_style_text, "count": count, "last_update_time": meta_data.get("last_update_time")}) return logger.info(f"个性表达方式提取response: {response}") # chat_id用personality - expressions = self.parse_expression_response(response, "personality") + # 转为dict并count=100 - result = [] - for _, situation, style in expressions: - result.append({"situation": situation, "style": style, "count": 100}) - # 超过50条时随机删除多余的,只保留50条 - if len(result) > 50: - remove_count = len(result) - 50 - remove_indices = set(random.sample(range(len(result)), remove_count)) - result = [item for idx, item in enumerate(result) if idx not in remove_indices] + if response != "": + expressions = self.parse_expression_response(response, "personality") + # 读取已有的表达方式 + existing_expressions = [] + if os.path.exists(self.expressions_file_path): + try: + with open(self.expressions_file_path, "r", encoding="utf-8") as f: + existing_expressions = json.load(f) + except (json.JSONDecodeError, FileNotFoundError): + logger.warning(f"无法读取或解析 {self.expressions_file_path},将创建新的表达文件。") + + # 创建新的表达方式 + new_expressions = [] + for _, situation, style in expressions: + new_expressions.append({"situation": situation, "style": style, "count": 1}) + + # 合并表达方式,如果situation和style相同则累加count + merged_expressions = existing_expressions.copy() + for new_expr in new_expressions: + found = False + for existing_expr in merged_expressions: + if (existing_expr["situation"] == new_expr["situation"] and + existing_expr["style"] == new_expr["style"]): + existing_expr["count"] += new_expr["count"] + found = True + break + if not found: + merged_expressions.append(new_expr) + + # 超过50条时随机删除多余的,只保留50条 + if len(merged_expressions) > 50: + remove_count = len(merged_expressions) - 50 + remove_indices = set(random.sample(range(len(merged_expressions)), remove_count)) + merged_expressions = [item for idx, item in enumerate(merged_expressions) if idx not in remove_indices] - with open(self.expressions_file_path, "w", encoding="utf-8") as f: - json.dump(result, f, ensure_ascii=False, indent=2) - logger.info(f"已写入{len(result)}条表达到{self.expressions_file_path}") + with open(self.expressions_file_path, "w", encoding="utf-8") as f: + json.dump(merged_expressions, f, ensure_ascii=False, indent=2) + logger.info(f"已写入{len(merged_expressions)}条表达到{self.expressions_file_path}") - # 成功提取后更新元数据 - count += 1 - self._write_meta_data({"last_style_text": current_style_text, "count": count}) - logger.info(f"成功处理。风格 '{current_style_text}' 的计数现在是 {count}。") + # 成功提取后更新元数据 + count += 1 + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + self._write_meta_data({ + "last_style_text": current_style_text, + "count": count, + "last_update_time": current_time + }) + logger.info(f"成功处理。风格 '{current_style_text}' 的计数现在是 {count},最后更新时间:{current_time}。") + else: + logger.warning(f"个性表达方式提取失败,模型返回空内容: {response}") def parse_expression_response(self, response: str, chat_id: str) -> List[Tuple[str, str, str]]: """