fix:修复人格表达生成太固定的问题
This commit is contained in:
265
scripts/preview_expressions.py
Normal file
265
scripts/preview_expressions.py
Normal file
@@ -0,0 +1,265 @@
|
|||||||
|
import tkinter as tk
|
||||||
|
from tkinter import ttk
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
import networkx as nx
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
|
||||||
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
||||||
|
from sklearn.metrics.pairwise import cosine_similarity
|
||||||
|
import numpy as np
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
class ExpressionViewer:
|
||||||
|
def __init__(self, root):
|
||||||
|
self.root = root
|
||||||
|
self.root.title("表达方式预览器")
|
||||||
|
self.root.geometry("1200x800")
|
||||||
|
|
||||||
|
# 创建主框架
|
||||||
|
self.main_frame = ttk.Frame(root)
|
||||||
|
self.main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
|
||||||
|
|
||||||
|
# 创建左侧控制面板
|
||||||
|
self.control_frame = ttk.Frame(self.main_frame)
|
||||||
|
self.control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 10))
|
||||||
|
|
||||||
|
# 创建搜索框
|
||||||
|
self.search_frame = ttk.Frame(self.control_frame)
|
||||||
|
self.search_frame.pack(fill=tk.X, pady=(0, 10))
|
||||||
|
|
||||||
|
self.search_var = tk.StringVar()
|
||||||
|
self.search_var.trace('w', self.filter_expressions)
|
||||||
|
self.search_entry = ttk.Entry(self.search_frame, textvariable=self.search_var)
|
||||||
|
self.search_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
|
||||||
|
ttk.Label(self.search_frame, text="搜索:").pack(side=tk.LEFT, padx=(0, 5))
|
||||||
|
|
||||||
|
# 创建文件选择下拉框
|
||||||
|
self.file_var = tk.StringVar()
|
||||||
|
self.file_combo = ttk.Combobox(self.search_frame, textvariable=self.file_var)
|
||||||
|
self.file_combo.pack(side=tk.LEFT, padx=5)
|
||||||
|
self.file_combo.bind('<<ComboboxSelected>>', self.load_file)
|
||||||
|
|
||||||
|
# 创建排序选项
|
||||||
|
self.sort_frame = ttk.LabelFrame(self.control_frame, text="排序选项")
|
||||||
|
self.sort_frame.pack(fill=tk.X, pady=5)
|
||||||
|
|
||||||
|
self.sort_var = tk.StringVar(value="count")
|
||||||
|
ttk.Radiobutton(self.sort_frame, text="按计数排序", variable=self.sort_var,
|
||||||
|
value="count", command=self.apply_sort).pack(anchor=tk.W)
|
||||||
|
ttk.Radiobutton(self.sort_frame, text="按情境排序", variable=self.sort_var,
|
||||||
|
value="situation", command=self.apply_sort).pack(anchor=tk.W)
|
||||||
|
ttk.Radiobutton(self.sort_frame, text="按风格排序", variable=self.sort_var,
|
||||||
|
value="style", command=self.apply_sort).pack(anchor=tk.W)
|
||||||
|
|
||||||
|
# 创建分群选项
|
||||||
|
self.group_frame = ttk.LabelFrame(self.control_frame, text="分群选项")
|
||||||
|
self.group_frame.pack(fill=tk.X, pady=5)
|
||||||
|
|
||||||
|
self.group_var = tk.StringVar(value="none")
|
||||||
|
ttk.Radiobutton(self.group_frame, text="不分群", variable=self.group_var,
|
||||||
|
value="none", command=self.apply_grouping).pack(anchor=tk.W)
|
||||||
|
ttk.Radiobutton(self.group_frame, text="按情境分群", variable=self.group_var,
|
||||||
|
value="situation", command=self.apply_grouping).pack(anchor=tk.W)
|
||||||
|
ttk.Radiobutton(self.group_frame, text="按风格分群", variable=self.group_var,
|
||||||
|
value="style", command=self.apply_grouping).pack(anchor=tk.W)
|
||||||
|
|
||||||
|
# 创建相似度阈值滑块
|
||||||
|
self.similarity_frame = ttk.LabelFrame(self.control_frame, text="相似度设置")
|
||||||
|
self.similarity_frame.pack(fill=tk.X, pady=5)
|
||||||
|
|
||||||
|
self.similarity_var = tk.DoubleVar(value=0.5)
|
||||||
|
self.similarity_scale = ttk.Scale(self.similarity_frame, from_=0.0, to=1.0,
|
||||||
|
variable=self.similarity_var, orient=tk.HORIZONTAL,
|
||||||
|
command=self.update_similarity)
|
||||||
|
self.similarity_scale.pack(fill=tk.X, padx=5, pady=5)
|
||||||
|
ttk.Label(self.similarity_frame, text="相似度阈值: 0.5").pack()
|
||||||
|
|
||||||
|
# 创建显示选项
|
||||||
|
self.view_frame = ttk.LabelFrame(self.control_frame, text="显示选项")
|
||||||
|
self.view_frame.pack(fill=tk.X, pady=5)
|
||||||
|
|
||||||
|
self.show_graph_var = tk.BooleanVar(value=True)
|
||||||
|
ttk.Checkbutton(self.view_frame, text="显示关系图", variable=self.show_graph_var,
|
||||||
|
command=self.toggle_graph).pack(anchor=tk.W)
|
||||||
|
|
||||||
|
# 创建右侧内容区域
|
||||||
|
self.content_frame = ttk.Frame(self.main_frame)
|
||||||
|
self.content_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
||||||
|
|
||||||
|
# 创建文本显示区域
|
||||||
|
self.text_area = tk.Text(self.content_frame, wrap=tk.WORD)
|
||||||
|
self.text_area.pack(side=tk.TOP, fill=tk.BOTH, expand=True)
|
||||||
|
|
||||||
|
# 添加滚动条
|
||||||
|
scrollbar = ttk.Scrollbar(self.text_area, command=self.text_area.yview)
|
||||||
|
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
|
||||||
|
self.text_area.config(yscrollcommand=scrollbar.set)
|
||||||
|
|
||||||
|
# 创建图形显示区域
|
||||||
|
self.graph_frame = ttk.Frame(self.content_frame)
|
||||||
|
self.graph_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True)
|
||||||
|
|
||||||
|
# 初始化数据
|
||||||
|
self.current_data = []
|
||||||
|
self.graph = nx.Graph()
|
||||||
|
self.canvas = None
|
||||||
|
|
||||||
|
# 加载文件列表
|
||||||
|
self.load_file_list()
|
||||||
|
|
||||||
|
def load_file_list(self):
|
||||||
|
expression_dir = Path("data/expression")
|
||||||
|
files = []
|
||||||
|
for root, _, filenames in os.walk(expression_dir):
|
||||||
|
for filename in filenames:
|
||||||
|
if filename.endswith('.json'):
|
||||||
|
rel_path = os.path.relpath(os.path.join(root, filename), expression_dir)
|
||||||
|
files.append(rel_path)
|
||||||
|
|
||||||
|
self.file_combo['values'] = files
|
||||||
|
if files:
|
||||||
|
self.file_combo.set(files[0])
|
||||||
|
self.load_file(None)
|
||||||
|
|
||||||
|
def load_file(self, event):
|
||||||
|
selected_file = self.file_var.get()
|
||||||
|
if not selected_file:
|
||||||
|
return
|
||||||
|
|
||||||
|
file_path = os.path.join("data/expression", selected_file)
|
||||||
|
try:
|
||||||
|
with open(file_path, 'r', encoding='utf-8') as f:
|
||||||
|
self.current_data = json.load(f)
|
||||||
|
|
||||||
|
self.apply_sort()
|
||||||
|
self.update_similarity()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.text_area.delete(1.0, tk.END)
|
||||||
|
self.text_area.insert(tk.END, f"加载文件时出错: {str(e)}")
|
||||||
|
|
||||||
|
def apply_sort(self):
|
||||||
|
if not self.current_data:
|
||||||
|
return
|
||||||
|
|
||||||
|
sort_key = self.sort_var.get()
|
||||||
|
reverse = sort_key == "count"
|
||||||
|
|
||||||
|
self.current_data.sort(key=lambda x: x.get(sort_key, ""), reverse=reverse)
|
||||||
|
self.apply_grouping()
|
||||||
|
|
||||||
|
def apply_grouping(self):
|
||||||
|
if not self.current_data:
|
||||||
|
return
|
||||||
|
|
||||||
|
group_key = self.group_var.get()
|
||||||
|
if group_key == "none":
|
||||||
|
self.display_data(self.current_data)
|
||||||
|
return
|
||||||
|
|
||||||
|
grouped_data = defaultdict(list)
|
||||||
|
for item in self.current_data:
|
||||||
|
key = item.get(group_key, "未分类")
|
||||||
|
grouped_data[key].append(item)
|
||||||
|
|
||||||
|
self.text_area.delete(1.0, tk.END)
|
||||||
|
for group, items in grouped_data.items():
|
||||||
|
self.text_area.insert(tk.END, f"\n=== {group} ===\n\n")
|
||||||
|
for item in items:
|
||||||
|
self.text_area.insert(tk.END, f"情境: {item.get('situation', 'N/A')}\n")
|
||||||
|
self.text_area.insert(tk.END, f"风格: {item.get('style', 'N/A')}\n")
|
||||||
|
self.text_area.insert(tk.END, f"计数: {item.get('count', 'N/A')}\n")
|
||||||
|
self.text_area.insert(tk.END, "-" * 50 + "\n")
|
||||||
|
|
||||||
|
def display_data(self, data):
|
||||||
|
self.text_area.delete(1.0, tk.END)
|
||||||
|
for item in data:
|
||||||
|
self.text_area.insert(tk.END, f"情境: {item.get('situation', 'N/A')}\n")
|
||||||
|
self.text_area.insert(tk.END, f"风格: {item.get('style', 'N/A')}\n")
|
||||||
|
self.text_area.insert(tk.END, f"计数: {item.get('count', 'N/A')}\n")
|
||||||
|
self.text_area.insert(tk.END, "-" * 50 + "\n")
|
||||||
|
|
||||||
|
def update_similarity(self, *args):
|
||||||
|
if not self.current_data:
|
||||||
|
return
|
||||||
|
|
||||||
|
threshold = self.similarity_var.get()
|
||||||
|
self.similarity_frame.winfo_children()[-1].config(text=f"相似度阈值: {threshold:.2f}")
|
||||||
|
|
||||||
|
# 计算相似度
|
||||||
|
texts = [f"{item['situation']} {item['style']}" for item in self.current_data]
|
||||||
|
vectorizer = TfidfVectorizer()
|
||||||
|
tfidf_matrix = vectorizer.fit_transform(texts)
|
||||||
|
similarity_matrix = cosine_similarity(tfidf_matrix)
|
||||||
|
|
||||||
|
# 创建图
|
||||||
|
self.graph.clear()
|
||||||
|
for i, item in enumerate(self.current_data):
|
||||||
|
self.graph.add_node(i, label=f"{item['situation']}\n{item['style']}")
|
||||||
|
|
||||||
|
# 添加边
|
||||||
|
for i in range(len(self.current_data)):
|
||||||
|
for j in range(i + 1, len(self.current_data)):
|
||||||
|
if similarity_matrix[i, j] > threshold:
|
||||||
|
self.graph.add_edge(i, j, weight=similarity_matrix[i, j])
|
||||||
|
|
||||||
|
if self.show_graph_var.get():
|
||||||
|
self.draw_graph()
|
||||||
|
|
||||||
|
def draw_graph(self):
|
||||||
|
if self.canvas:
|
||||||
|
self.canvas.get_tk_widget().destroy()
|
||||||
|
|
||||||
|
fig = plt.figure(figsize=(8, 6))
|
||||||
|
pos = nx.spring_layout(self.graph)
|
||||||
|
|
||||||
|
# 绘制节点
|
||||||
|
nx.draw_networkx_nodes(self.graph, pos, node_color='lightblue',
|
||||||
|
node_size=1000, alpha=0.6)
|
||||||
|
|
||||||
|
# 绘制边
|
||||||
|
nx.draw_networkx_edges(self.graph, pos, alpha=0.4)
|
||||||
|
|
||||||
|
# 添加标签
|
||||||
|
labels = nx.get_node_attributes(self.graph, 'label')
|
||||||
|
nx.draw_networkx_labels(self.graph, pos, labels, font_size=8)
|
||||||
|
|
||||||
|
plt.title("表达方式关系图")
|
||||||
|
plt.axis('off')
|
||||||
|
|
||||||
|
self.canvas = FigureCanvasTkAgg(fig, master=self.graph_frame)
|
||||||
|
self.canvas.draw()
|
||||||
|
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
|
||||||
|
|
||||||
|
def toggle_graph(self):
|
||||||
|
if self.show_graph_var.get():
|
||||||
|
self.draw_graph()
|
||||||
|
else:
|
||||||
|
if self.canvas:
|
||||||
|
self.canvas.get_tk_widget().destroy()
|
||||||
|
self.canvas = None
|
||||||
|
|
||||||
|
def filter_expressions(self, *args):
|
||||||
|
search_text = self.search_var.get().lower()
|
||||||
|
if not search_text:
|
||||||
|
self.apply_sort()
|
||||||
|
return
|
||||||
|
|
||||||
|
filtered_data = []
|
||||||
|
for item in self.current_data:
|
||||||
|
situation = item.get('situation', '').lower()
|
||||||
|
style = item.get('style', '').lower()
|
||||||
|
if search_text in situation or search_text in style:
|
||||||
|
filtered_data.append(item)
|
||||||
|
|
||||||
|
self.display_data(filtered_data)
|
||||||
|
|
||||||
|
def main():
|
||||||
|
root = tk.Tk()
|
||||||
|
app = ExpressionViewer(root)
|
||||||
|
root.mainloop()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -124,9 +124,7 @@ class TelemetryHeartBeatTask(AsyncTask):
|
|||||||
timeout=5, # 设置超时时间为5秒
|
timeout=5, # 设置超时时间为5秒
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# 你知道为什么设置成debug吗?
|
logger.warning(f"(此错误不会影响正常使用)状态未发生: {e}")
|
||||||
# 因为我不想看到
|
|
||||||
logger.debug(f"心跳发送失败: {e}")
|
|
||||||
|
|
||||||
logger.debug(response)
|
logger.debug(response)
|
||||||
|
|
||||||
@@ -136,21 +134,21 @@ class TelemetryHeartBeatTask(AsyncTask):
|
|||||||
logger.debug(f"心跳发送成功,状态码: {response.status_code}")
|
logger.debug(f"心跳发送成功,状态码: {response.status_code}")
|
||||||
elif response.status_code == 403:
|
elif response.status_code == 403:
|
||||||
# 403 Forbidden
|
# 403 Forbidden
|
||||||
logger.error(
|
logger.warning(
|
||||||
"心跳发送失败,403 Forbidden: 可能是UUID无效或未注册。"
|
"(此错误不会影响正常使用)心跳发送失败,403 Forbidden: 可能是UUID无效或未注册。"
|
||||||
"处理措施:重置UUID,下次发送心跳时将尝试重新注册。"
|
"处理措施:重置UUID,下次发送心跳时将尝试重新注册。"
|
||||||
)
|
)
|
||||||
self.client_uuid = None
|
self.client_uuid = None
|
||||||
del local_storage["mmc_uuid"] # 删除本地存储的UUID
|
del local_storage["mmc_uuid"] # 删除本地存储的UUID
|
||||||
else:
|
else:
|
||||||
# 其他错误
|
# 其他错误
|
||||||
logger.error(f"心跳发送失败,状态码: {response.status_code}, 响应内容: {response.text}")
|
logger.warning(f"(此错误不会影响正常使用)状态未发送,状态码: {response.status_code}, 响应内容: {response.text}")
|
||||||
|
|
||||||
async def run(self):
|
async def run(self):
|
||||||
# 发送心跳
|
# 发送心跳
|
||||||
if global_config.telemetry.enable:
|
if global_config.telemetry.enable:
|
||||||
if self.client_uuid is None and not await self._req_uuid():
|
if self.client_uuid is None and not await self._req_uuid():
|
||||||
logger.error("获取UUID失败,跳过此次心跳")
|
logger.warning("获取UUID失败,跳过此次心跳")
|
||||||
return
|
return
|
||||||
|
|
||||||
await self._send_heartbeat()
|
await self._send_heartbeat()
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
|
|||||||
from typing import List, Tuple
|
from typing import List, Tuple
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
logger = get_logger("expressor")
|
logger = get_logger("expressor")
|
||||||
|
|
||||||
@@ -45,11 +46,30 @@ class PersonalityExpression:
|
|||||||
if os.path.exists(self.meta_file_path):
|
if os.path.exists(self.meta_file_path):
|
||||||
try:
|
try:
|
||||||
with open(self.meta_file_path, "r", encoding="utf-8") as f:
|
with open(self.meta_file_path, "r", encoding="utf-8") as f:
|
||||||
return json.load(f)
|
meta_data = json.load(f)
|
||||||
|
# 检查是否有last_update_time字段
|
||||||
|
if "last_update_time" not in meta_data:
|
||||||
|
logger.warning(f"{self.meta_file_path} 中缺少last_update_time字段,将重新开始。")
|
||||||
|
# 清空并重写元数据文件
|
||||||
|
self._write_meta_data({"last_style_text": None, "count": 0, "last_update_time": None})
|
||||||
|
# 清空并重写表达文件
|
||||||
|
if os.path.exists(self.expressions_file_path):
|
||||||
|
with open(self.expressions_file_path, "w", encoding="utf-8") as f:
|
||||||
|
json.dump([], f, ensure_ascii=False, indent=2)
|
||||||
|
logger.debug(f"已清空表达文件: {self.expressions_file_path}")
|
||||||
|
return {"last_style_text": None, "count": 0, "last_update_time": None}
|
||||||
|
return meta_data
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
logger.warning(f"无法解析 {self.meta_file_path} 中的JSON数据,将重新开始。")
|
logger.warning(f"无法解析 {self.meta_file_path} 中的JSON数据,将重新开始。")
|
||||||
return {"last_style_text": None, "count": 0}
|
# 清空并重写元数据文件
|
||||||
return {"last_style_text": None, "count": 0}
|
self._write_meta_data({"last_style_text": None, "count": 0, "last_update_time": None})
|
||||||
|
# 清空并重写表达文件
|
||||||
|
if os.path.exists(self.expressions_file_path):
|
||||||
|
with open(self.expressions_file_path, "w", encoding="utf-8") as f:
|
||||||
|
json.dump([], f, ensure_ascii=False, indent=2)
|
||||||
|
logger.debug(f"已清空表达文件: {self.expressions_file_path}")
|
||||||
|
return {"last_style_text": None, "count": 0, "last_update_time": None}
|
||||||
|
return {"last_style_text": None, "count": 0, "last_update_time": None}
|
||||||
|
|
||||||
def _write_meta_data(self, data):
|
def _write_meta_data(self, data):
|
||||||
os.makedirs(os.path.dirname(self.meta_file_path), exist_ok=True)
|
os.makedirs(os.path.dirname(self.meta_file_path), exist_ok=True)
|
||||||
@@ -84,7 +104,7 @@ class PersonalityExpression:
|
|||||||
if count >= self.max_calculations:
|
if count >= self.max_calculations:
|
||||||
logger.debug(f"对于风格 '{current_style_text}' 已达到最大计算次数 ({self.max_calculations})。跳过提取。")
|
logger.debug(f"对于风格 '{current_style_text}' 已达到最大计算次数 ({self.max_calculations})。跳过提取。")
|
||||||
# 即使跳过,也更新元数据以反映当前风格已被识别且计数已满
|
# 即使跳过,也更新元数据以反映当前风格已被识别且计数已满
|
||||||
self._write_meta_data({"last_style_text": current_style_text, "count": count})
|
self._write_meta_data({"last_style_text": current_style_text, "count": count, "last_update_time": meta_data.get("last_update_time")})
|
||||||
return
|
return
|
||||||
|
|
||||||
# 构建prompt
|
# 构建prompt
|
||||||
@@ -99,30 +119,63 @@ class PersonalityExpression:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"个性表达方式提取失败: {e}")
|
logger.error(f"个性表达方式提取失败: {e}")
|
||||||
# 如果提取失败,保存当前的风格和未增加的计数
|
# 如果提取失败,保存当前的风格和未增加的计数
|
||||||
self._write_meta_data({"last_style_text": current_style_text, "count": count})
|
self._write_meta_data({"last_style_text": current_style_text, "count": count, "last_update_time": meta_data.get("last_update_time")})
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.info(f"个性表达方式提取response: {response}")
|
logger.info(f"个性表达方式提取response: {response}")
|
||||||
# chat_id用personality
|
# chat_id用personality
|
||||||
expressions = self.parse_expression_response(response, "personality")
|
|
||||||
# 转为dict并count=100
|
# 转为dict并count=100
|
||||||
result = []
|
if response != "":
|
||||||
for _, situation, style in expressions:
|
expressions = self.parse_expression_response(response, "personality")
|
||||||
result.append({"situation": situation, "style": style, "count": 100})
|
# 读取已有的表达方式
|
||||||
# 超过50条时随机删除多余的,只保留50条
|
existing_expressions = []
|
||||||
if len(result) > 50:
|
if os.path.exists(self.expressions_file_path):
|
||||||
remove_count = len(result) - 50
|
try:
|
||||||
remove_indices = set(random.sample(range(len(result)), remove_count))
|
with open(self.expressions_file_path, "r", encoding="utf-8") as f:
|
||||||
result = [item for idx, item in enumerate(result) if idx not in remove_indices]
|
existing_expressions = json.load(f)
|
||||||
|
except (json.JSONDecodeError, FileNotFoundError):
|
||||||
|
logger.warning(f"无法读取或解析 {self.expressions_file_path},将创建新的表达文件。")
|
||||||
|
|
||||||
|
# 创建新的表达方式
|
||||||
|
new_expressions = []
|
||||||
|
for _, situation, style in expressions:
|
||||||
|
new_expressions.append({"situation": situation, "style": style, "count": 1})
|
||||||
|
|
||||||
|
# 合并表达方式,如果situation和style相同则累加count
|
||||||
|
merged_expressions = existing_expressions.copy()
|
||||||
|
for new_expr in new_expressions:
|
||||||
|
found = False
|
||||||
|
for existing_expr in merged_expressions:
|
||||||
|
if (existing_expr["situation"] == new_expr["situation"] and
|
||||||
|
existing_expr["style"] == new_expr["style"]):
|
||||||
|
existing_expr["count"] += new_expr["count"]
|
||||||
|
found = True
|
||||||
|
break
|
||||||
|
if not found:
|
||||||
|
merged_expressions.append(new_expr)
|
||||||
|
|
||||||
|
# 超过50条时随机删除多余的,只保留50条
|
||||||
|
if len(merged_expressions) > 50:
|
||||||
|
remove_count = len(merged_expressions) - 50
|
||||||
|
remove_indices = set(random.sample(range(len(merged_expressions)), remove_count))
|
||||||
|
merged_expressions = [item for idx, item in enumerate(merged_expressions) if idx not in remove_indices]
|
||||||
|
|
||||||
with open(self.expressions_file_path, "w", encoding="utf-8") as f:
|
with open(self.expressions_file_path, "w", encoding="utf-8") as f:
|
||||||
json.dump(result, f, ensure_ascii=False, indent=2)
|
json.dump(merged_expressions, f, ensure_ascii=False, indent=2)
|
||||||
logger.info(f"已写入{len(result)}条表达到{self.expressions_file_path}")
|
logger.info(f"已写入{len(merged_expressions)}条表达到{self.expressions_file_path}")
|
||||||
|
|
||||||
# 成功提取后更新元数据
|
# 成功提取后更新元数据
|
||||||
count += 1
|
count += 1
|
||||||
self._write_meta_data({"last_style_text": current_style_text, "count": count})
|
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
logger.info(f"成功处理。风格 '{current_style_text}' 的计数现在是 {count}。")
|
self._write_meta_data({
|
||||||
|
"last_style_text": current_style_text,
|
||||||
|
"count": count,
|
||||||
|
"last_update_time": current_time
|
||||||
|
})
|
||||||
|
logger.info(f"成功处理。风格 '{current_style_text}' 的计数现在是 {count},最后更新时间:{current_time}。")
|
||||||
|
else:
|
||||||
|
logger.warning(f"个性表达方式提取失败,模型返回空内容: {response}")
|
||||||
|
|
||||||
def parse_expression_response(self, response: str, chat_id: str) -> List[Tuple[str, str, str]]:
|
def parse_expression_response(self, response: str, chat_id: str) -> List[Tuple[str, str, str]]:
|
||||||
"""
|
"""
|
||||||
|
|||||||
Reference in New Issue
Block a user