Merge branch 'dev' of https://github.com/MaiM-with-u/MaiBot into dev
This commit is contained in:
167
scripts/import_openie.py
Normal file
167
scripts/import_openie.py
Normal file
@@ -0,0 +1,167 @@
|
||||
# try:
|
||||
# import src.plugins.knowledge.lib.quick_algo
|
||||
# except ImportError:
|
||||
# print("未找到quick_algo库,无法使用quick_algo算法")
|
||||
# print("请安装quick_algo库 - 在lib.quick_algo中,执行命令:python setup.py build_ext --inplace")
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
from typing import Dict, List
|
||||
|
||||
from src.plugins.knowledge.src.lpmmconfig import PG_NAMESPACE, global_config
|
||||
from src.plugins.knowledge.src.embedding_store import EmbeddingManager
|
||||
from src.plugins.knowledge.src.llm_client import LLMClient
|
||||
from src.plugins.knowledge.src.open_ie import OpenIE
|
||||
from src.plugins.knowledge.src.kg_manager import KGManager
|
||||
from src.common.logger import get_module_logger
|
||||
from src.plugins.knowledge.src.utils.hash import get_sha256
|
||||
|
||||
|
||||
# 添加项目根目录到 sys.path
|
||||
|
||||
|
||||
logger = get_module_logger("LPMM知识库-OpenIE导入")
|
||||
|
||||
|
||||
def hash_deduplicate(
|
||||
raw_paragraphs: Dict[str, str],
|
||||
triple_list_data: Dict[str, List[List[str]]],
|
||||
stored_pg_hashes: set,
|
||||
stored_paragraph_hashes: set,
|
||||
):
|
||||
"""Hash去重
|
||||
|
||||
Args:
|
||||
raw_paragraphs: 索引的段落原文
|
||||
triple_list_data: 索引的三元组列表
|
||||
stored_pg_hashes: 已存储的段落hash集合
|
||||
stored_paragraph_hashes: 已存储的段落hash集合
|
||||
|
||||
Returns:
|
||||
new_raw_paragraphs: 去重后的段落
|
||||
new_triple_list_data: 去重后的三元组
|
||||
"""
|
||||
# 保存去重后的段落
|
||||
new_raw_paragraphs = dict()
|
||||
# 保存去重后的三元组
|
||||
new_triple_list_data = dict()
|
||||
|
||||
for _, (raw_paragraph, triple_list) in enumerate(zip(raw_paragraphs.values(), triple_list_data.values())):
|
||||
# 段落hash
|
||||
paragraph_hash = get_sha256(raw_paragraph)
|
||||
if ((PG_NAMESPACE + "-" + paragraph_hash) in stored_pg_hashes) and (paragraph_hash in stored_paragraph_hashes):
|
||||
continue
|
||||
new_raw_paragraphs[paragraph_hash] = raw_paragraph
|
||||
new_triple_list_data[paragraph_hash] = triple_list
|
||||
|
||||
return new_raw_paragraphs, new_triple_list_data
|
||||
|
||||
|
||||
def handle_import_openie(openie_data: OpenIE, embed_manager: EmbeddingManager, kg_manager: KGManager) -> bool:
|
||||
# 从OpenIE数据中提取段落原文与三元组列表
|
||||
# 索引的段落原文
|
||||
raw_paragraphs = openie_data.extract_raw_paragraph_dict()
|
||||
# 索引的实体列表
|
||||
entity_list_data = openie_data.extract_entity_dict()
|
||||
# 索引的三元组列表
|
||||
triple_list_data = openie_data.extract_triple_dict()
|
||||
if len(raw_paragraphs) != len(entity_list_data) or len(raw_paragraphs) != len(triple_list_data):
|
||||
logger.error("OpenIE数据存在异常")
|
||||
return False
|
||||
# 将索引换为对应段落的hash值
|
||||
logger.info("正在进行段落去重与重索引")
|
||||
raw_paragraphs, triple_list_data = hash_deduplicate(
|
||||
raw_paragraphs,
|
||||
triple_list_data,
|
||||
embed_manager.stored_pg_hashes,
|
||||
kg_manager.stored_paragraph_hashes,
|
||||
)
|
||||
if len(raw_paragraphs) != 0:
|
||||
# 获取嵌入并保存
|
||||
logger.info(f"段落去重完成,剩余待处理的段落数量:{len(raw_paragraphs)}")
|
||||
logger.info("开始Embedding")
|
||||
embed_manager.store_new_data_set(raw_paragraphs, triple_list_data)
|
||||
# Embedding-Faiss重索引
|
||||
logger.info("正在重新构建向量索引")
|
||||
embed_manager.rebuild_faiss_index()
|
||||
logger.info("向量索引构建完成")
|
||||
embed_manager.save_to_file()
|
||||
logger.info("Embedding完成")
|
||||
# 构建新段落的RAG
|
||||
logger.info("开始构建RAG")
|
||||
kg_manager.build_kg(triple_list_data, embed_manager)
|
||||
kg_manager.save_to_file()
|
||||
logger.info("RAG构建完成")
|
||||
else:
|
||||
logger.info("无新段落需要处理")
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
# 新增确认提示
|
||||
print("=== 重要操作确认 ===")
|
||||
print("OpenIE导入时会大量发送请求,可能会撞到请求速度上限,请注意选用的模型")
|
||||
print("同之前样例:在本地模型下,在70分钟内我们发送了约8万条请求,在网络允许下,速度会更快")
|
||||
print("推荐使用硅基流动的Pro/BAAI/bge-m3")
|
||||
print("每百万Token费用为0.7元")
|
||||
print("知识导入时,会消耗大量系统资源,建议在较好配置电脑上运行")
|
||||
print("同上样例,导入时10700K几乎跑满,14900HX占用80%,峰值内存占用约3G")
|
||||
confirm = input("确认继续执行?(y/n): ").strip().lower()
|
||||
if confirm != "y":
|
||||
logger.info("用户取消操作")
|
||||
print("操作已取消")
|
||||
sys.exit(1)
|
||||
print("\n" + "=" * 40 + "\n")
|
||||
|
||||
logger.info("----开始导入openie数据----\n")
|
||||
|
||||
logger.info("创建LLM客户端")
|
||||
llm_client_list = dict()
|
||||
for key in global_config["llm_providers"]:
|
||||
llm_client_list[key] = LLMClient(
|
||||
global_config["llm_providers"][key]["base_url"],
|
||||
global_config["llm_providers"][key]["api_key"],
|
||||
)
|
||||
|
||||
# 初始化Embedding库
|
||||
embed_manager = embed_manager = EmbeddingManager(llm_client_list[global_config["embedding"]["provider"]])
|
||||
logger.info("正在从文件加载Embedding库")
|
||||
try:
|
||||
embed_manager.load_from_file()
|
||||
except Exception as e:
|
||||
logger.error("从文件加载Embedding库时发生错误:{}".format(e))
|
||||
logger.info("Embedding库加载完成")
|
||||
# 初始化KG
|
||||
kg_manager = KGManager()
|
||||
logger.info("正在从文件加载KG")
|
||||
try:
|
||||
kg_manager.load_from_file()
|
||||
except Exception as e:
|
||||
logger.error("从文件加载KG时发生错误:{}".format(e))
|
||||
logger.info("KG加载完成")
|
||||
|
||||
logger.info(f"KG节点数量:{len(kg_manager.graph.get_node_list())}")
|
||||
logger.info(f"KG边数量:{len(kg_manager.graph.get_edge_list())}")
|
||||
|
||||
# 数据比对:Embedding库与KG的段落hash集合
|
||||
for pg_hash in kg_manager.stored_paragraph_hashes:
|
||||
key = PG_NAMESPACE + "-" + pg_hash
|
||||
if key not in embed_manager.stored_pg_hashes:
|
||||
logger.warning(f"KG中存在Embedding库中不存在的段落:{key}")
|
||||
|
||||
logger.info("正在导入OpenIE数据文件")
|
||||
try:
|
||||
openie_data = OpenIE.load()
|
||||
except Exception as e:
|
||||
logger.error("导入OpenIE数据文件时发生错误:{}".format(e))
|
||||
return False
|
||||
if handle_import_openie(openie_data, embed_manager, kg_manager) is False:
|
||||
logger.error("处理OpenIE数据时发生错误")
|
||||
return False
|
||||
return None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
178
scripts/info_extraction.py
Normal file
178
scripts/info_extraction.py
Normal file
@@ -0,0 +1,178 @@
|
||||
import json
|
||||
import os
|
||||
import signal
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from threading import Lock, Event
|
||||
import sys
|
||||
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
# 添加项目根目录到 sys.path
|
||||
|
||||
import tqdm
|
||||
|
||||
from src.common.logger import get_module_logger
|
||||
from src.plugins.knowledge.src.lpmmconfig import global_config
|
||||
from src.plugins.knowledge.src.ie_process import info_extract_from_str
|
||||
from src.plugins.knowledge.src.llm_client import LLMClient
|
||||
from src.plugins.knowledge.src.open_ie import OpenIE
|
||||
from src.plugins.knowledge.src.raw_processing import load_raw_data
|
||||
|
||||
logger = get_module_logger("LPMM知识库-信息提取")
|
||||
|
||||
TEMP_DIR = "./temp"
|
||||
|
||||
# 创建一个线程安全的锁,用于保护文件操作和共享数据
|
||||
file_lock = Lock()
|
||||
open_ie_doc_lock = Lock()
|
||||
|
||||
# 创建一个事件标志,用于控制程序终止
|
||||
shutdown_event = Event()
|
||||
|
||||
|
||||
def process_single_text(pg_hash, raw_data, llm_client_list):
|
||||
"""处理单个文本的函数,用于线程池"""
|
||||
temp_file_path = f"{TEMP_DIR}/{pg_hash}.json"
|
||||
|
||||
# 使用文件锁检查和读取缓存文件
|
||||
with file_lock:
|
||||
if os.path.exists(temp_file_path):
|
||||
try:
|
||||
# 存在对应的提取结果
|
||||
logger.info(f"找到缓存的提取结果:{pg_hash}")
|
||||
with open(temp_file_path, "r", encoding="utf-8") as f:
|
||||
return json.load(f), None
|
||||
except json.JSONDecodeError:
|
||||
# 如果JSON文件损坏,删除它并重新处理
|
||||
logger.warning(f"缓存文件损坏,重新处理:{pg_hash}")
|
||||
os.remove(temp_file_path)
|
||||
|
||||
entity_list, rdf_triple_list = info_extract_from_str(
|
||||
llm_client_list[global_config["entity_extract"]["llm"]["provider"]],
|
||||
llm_client_list[global_config["rdf_build"]["llm"]["provider"]],
|
||||
raw_data,
|
||||
)
|
||||
if entity_list is None or rdf_triple_list is None:
|
||||
return None, pg_hash
|
||||
else:
|
||||
doc_item = {
|
||||
"idx": pg_hash,
|
||||
"passage": raw_data,
|
||||
"extracted_entities": entity_list,
|
||||
"extracted_triples": rdf_triple_list,
|
||||
}
|
||||
# 保存临时提取结果
|
||||
with file_lock:
|
||||
try:
|
||||
with open(temp_file_path, "w", encoding="utf-8") as f:
|
||||
json.dump(doc_item, f, ensure_ascii=False, indent=4)
|
||||
except Exception as e:
|
||||
logger.error(f"保存缓存文件失败:{pg_hash}, 错误:{e}")
|
||||
# 如果保存失败,确保不会留下损坏的文件
|
||||
if os.path.exists(temp_file_path):
|
||||
os.remove(temp_file_path)
|
||||
# 设置shutdown_event以终止程序
|
||||
shutdown_event.set()
|
||||
return None, pg_hash
|
||||
return doc_item, None
|
||||
|
||||
|
||||
def signal_handler(signum, frame):
|
||||
"""处理Ctrl+C信号"""
|
||||
logger.info("\n接收到中断信号,正在优雅地关闭程序...")
|
||||
shutdown_event.set()
|
||||
|
||||
|
||||
def main():
|
||||
# 设置信号处理器
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
# 新增用户确认提示
|
||||
print("=== 重要操作确认 ===")
|
||||
print("实体提取操作将会花费较多资金和时间,建议在空闲时段执行。")
|
||||
print("举例:600万字全剧情,提取选用deepseek v3 0324,消耗约40元,约3小时。")
|
||||
print("建议使用硅基流动的非Pro模型")
|
||||
print("或者使用可以用赠金抵扣的Pro模型")
|
||||
print("请确保账户余额充足,并且在执行前确认无误。")
|
||||
confirm = input("确认继续执行?(y/n): ").strip().lower()
|
||||
if confirm != "y":
|
||||
logger.info("用户取消操作")
|
||||
print("操作已取消")
|
||||
sys.exit(1)
|
||||
print("\n" + "=" * 40 + "\n")
|
||||
|
||||
logger.info("--------进行信息提取--------\n")
|
||||
|
||||
logger.info("创建LLM客户端")
|
||||
llm_client_list = dict()
|
||||
for key in global_config["llm_providers"]:
|
||||
llm_client_list[key] = LLMClient(
|
||||
global_config["llm_providers"][key]["base_url"],
|
||||
global_config["llm_providers"][key]["api_key"],
|
||||
)
|
||||
|
||||
logger.info("正在加载原始数据")
|
||||
sha256_list, raw_datas = load_raw_data()
|
||||
logger.info("原始数据加载完成\n")
|
||||
|
||||
# 创建临时目录
|
||||
if not os.path.exists(f"{TEMP_DIR}"):
|
||||
os.makedirs(f"{TEMP_DIR}")
|
||||
|
||||
failed_sha256 = []
|
||||
open_ie_doc = []
|
||||
|
||||
# 创建线程池,最大线程数为50
|
||||
workers = global_config["info_extraction"]["workers"]
|
||||
with ThreadPoolExecutor(max_workers=workers) as executor:
|
||||
# 提交所有任务到线程池
|
||||
future_to_hash = {
|
||||
executor.submit(process_single_text, pg_hash, raw_data, llm_client_list): pg_hash
|
||||
for pg_hash, raw_data in zip(sha256_list, raw_datas)
|
||||
}
|
||||
|
||||
# 使用tqdm显示进度
|
||||
with tqdm.tqdm(total=len(future_to_hash), postfix="正在进行提取:") as pbar:
|
||||
# 处理完成的任务
|
||||
try:
|
||||
for future in as_completed(future_to_hash):
|
||||
if shutdown_event.is_set():
|
||||
# 取消所有未完成的任务
|
||||
for f in future_to_hash:
|
||||
if not f.done():
|
||||
f.cancel()
|
||||
break
|
||||
|
||||
doc_item, failed_hash = future.result()
|
||||
if failed_hash:
|
||||
failed_sha256.append(failed_hash)
|
||||
logger.error(f"提取失败:{failed_hash}")
|
||||
elif doc_item:
|
||||
with open_ie_doc_lock:
|
||||
open_ie_doc.append(doc_item)
|
||||
pbar.update(1)
|
||||
except KeyboardInterrupt:
|
||||
# 如果在这里捕获到KeyboardInterrupt,说明signal_handler可能没有正常工作
|
||||
logger.info("\n接收到中断信号,正在优雅地关闭程序...")
|
||||
shutdown_event.set()
|
||||
# 取消所有未完成的任务
|
||||
for f in future_to_hash:
|
||||
if not f.done():
|
||||
f.cancel()
|
||||
|
||||
# 保存信息提取结果
|
||||
sum_phrase_chars = sum([len(e) for chunk in open_ie_doc for e in chunk["extracted_entities"]])
|
||||
sum_phrase_words = sum([len(e.split()) for chunk in open_ie_doc for e in chunk["extracted_entities"]])
|
||||
num_phrases = sum([len(chunk["extracted_entities"]) for chunk in open_ie_doc])
|
||||
openie_obj = OpenIE(
|
||||
open_ie_doc,
|
||||
round(sum_phrase_chars / num_phrases, 4),
|
||||
round(sum_phrase_words / num_phrases, 4),
|
||||
)
|
||||
OpenIE.save(openie_obj)
|
||||
|
||||
logger.info("--------信息提取完成--------")
|
||||
logger.info(f"提取失败的文段SHA256:{failed_sha256}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
569
scripts/interest_monitor_gui.py
Normal file
569
scripts/interest_monitor_gui.py
Normal file
@@ -0,0 +1,569 @@
|
||||
import tkinter as tk
|
||||
from tkinter import ttk
|
||||
import time
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
import random
|
||||
from collections import deque
|
||||
import json # 引入 json
|
||||
|
||||
# --- 引入 Matplotlib ---
|
||||
from matplotlib.figure import Figure
|
||||
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
|
||||
import matplotlib.dates as mdates # 用于处理日期格式
|
||||
import matplotlib # 导入 matplotlib
|
||||
|
||||
# --- 配置 ---
|
||||
LOG_FILE_PATH = os.path.join("logs", "interest", "interest_history.log") # 指向历史日志文件
|
||||
REFRESH_INTERVAL_MS = 200 # 刷新间隔 (毫秒) - 可以适当调长,因为读取文件可能耗时
|
||||
WINDOW_TITLE = "Interest Monitor (Live History)"
|
||||
MAX_HISTORY_POINTS = 1000 # 图表上显示的最大历史点数 (可以增加)
|
||||
MAX_STREAMS_TO_DISPLAY = 15 # 最多显示多少个聊天流的折线图 (可以增加)
|
||||
|
||||
# *** 添加 Matplotlib 中文字体配置 ***
|
||||
# 尝试使用 'SimHei' 或 'Microsoft YaHei',如果找不到,matplotlib 会回退到默认字体
|
||||
# 确保你的系统上安装了这些字体
|
||||
matplotlib.rcParams["font.sans-serif"] = ["SimHei", "Microsoft YaHei"]
|
||||
matplotlib.rcParams["axes.unicode_minus"] = False # 解决负号'-'显示为方块的问题
|
||||
|
||||
|
||||
class InterestMonitorApp:
|
||||
def __init__(self, root):
|
||||
self.root = root
|
||||
self.root.title(WINDOW_TITLE)
|
||||
self.root.geometry("1800x800") # 调整窗口大小以适应图表
|
||||
|
||||
# --- 数据存储 ---
|
||||
# 使用 deque 来存储有限的历史数据点
|
||||
# key: stream_id, value: deque([(timestamp, interest_level), ...])
|
||||
self.stream_history = {}
|
||||
# key: stream_id, value: deque([(timestamp, reply_probability), ...])
|
||||
self.probability_history = {}
|
||||
self.stream_colors = {} # 为每个 stream 分配颜色
|
||||
self.stream_display_names = {} # 存储显示名称 (group_name)
|
||||
self.selected_stream_id = tk.StringVar() # 用于 Combobox 绑定
|
||||
|
||||
# --- 新增:存储其他参数 ---
|
||||
# 顶层信息
|
||||
self.latest_main_mind = tk.StringVar(value="N/A")
|
||||
self.latest_mai_state = tk.StringVar(value="N/A")
|
||||
self.latest_subflow_count = tk.IntVar(value=0)
|
||||
# 子流最新状态 (key: stream_id)
|
||||
self.stream_sub_minds = {}
|
||||
self.stream_chat_states = {}
|
||||
self.stream_threshold_status = {}
|
||||
self.stream_last_active = {}
|
||||
self.stream_last_interaction = {}
|
||||
# 用于显示单个流详情的 StringVar
|
||||
self.single_stream_sub_mind = tk.StringVar(value="想法: N/A")
|
||||
self.single_stream_chat_state = tk.StringVar(value="状态: N/A")
|
||||
self.single_stream_threshold = tk.StringVar(value="阈值: N/A")
|
||||
self.single_stream_last_active = tk.StringVar(value="活跃: N/A")
|
||||
self.single_stream_last_interaction = tk.StringVar(value="交互: N/A")
|
||||
|
||||
# --- UI 元素 ---
|
||||
|
||||
# --- 新增:顶部全局信息框架 ---
|
||||
self.global_info_frame = ttk.Frame(root, padding="5 0 5 5") # 顶部内边距调整
|
||||
self.global_info_frame.pack(side=tk.TOP, fill=tk.X, pady=(5, 0)) # 底部外边距为0
|
||||
|
||||
ttk.Label(self.global_info_frame, text="全局状态:").pack(side=tk.LEFT, padx=(0, 10))
|
||||
ttk.Label(self.global_info_frame, textvariable=self.latest_mai_state).pack(side=tk.LEFT, padx=5)
|
||||
ttk.Label(self.global_info_frame, text="想法:").pack(side=tk.LEFT, padx=(10, 0))
|
||||
ttk.Label(self.global_info_frame, textvariable=self.latest_main_mind).pack(side=tk.LEFT, padx=5)
|
||||
ttk.Label(self.global_info_frame, text="子流数:").pack(side=tk.LEFT, padx=(10, 0))
|
||||
ttk.Label(self.global_info_frame, textvariable=self.latest_subflow_count).pack(side=tk.LEFT, padx=5)
|
||||
|
||||
# 创建 Notebook (选项卡控件)
|
||||
self.notebook = ttk.Notebook(root)
|
||||
# 修改:fill 和 expand,让 notebook 填充剩余空间
|
||||
self.notebook.pack(pady=(5, 0), padx=10, fill=tk.BOTH, expand=1) # 顶部外边距改小
|
||||
|
||||
# --- 第一个选项卡:所有流 ---
|
||||
self.frame_all = ttk.Frame(self.notebook, padding="5 5 5 5")
|
||||
self.notebook.add(self.frame_all, text="所有聊天流")
|
||||
|
||||
# 状态标签 (移动到最底部)
|
||||
self.status_label = tk.Label(root, text="Initializing...", anchor="w", fg="grey")
|
||||
self.status_label.pack(side=tk.BOTTOM, fill=tk.X, padx=10, pady=(0, 5)) # 调整边距
|
||||
|
||||
# Matplotlib 图表设置 (用于第一个选项卡)
|
||||
self.fig = Figure(figsize=(5, 4), dpi=100)
|
||||
self.ax = self.fig.add_subplot(111)
|
||||
# 配置在 update_plot 中进行,避免重复
|
||||
|
||||
# 创建 Tkinter 画布嵌入 Matplotlib 图表 (用于第一个选项卡)
|
||||
self.canvas = FigureCanvasTkAgg(self.fig, master=self.frame_all) # <--- 放入 frame_all
|
||||
self.canvas_widget = self.canvas.get_tk_widget()
|
||||
self.canvas_widget.pack(side=tk.TOP, fill=tk.BOTH, expand=1)
|
||||
|
||||
# --- 第二个选项卡:单个流 ---
|
||||
self.frame_single = ttk.Frame(self.notebook, padding="5 5 5 5")
|
||||
self.notebook.add(self.frame_single, text="单个聊天流详情")
|
||||
|
||||
# 单个流选项卡的上部控制区域
|
||||
self.control_frame_single = ttk.Frame(self.frame_single)
|
||||
self.control_frame_single.pack(side=tk.TOP, fill=tk.X, pady=5)
|
||||
|
||||
ttk.Label(self.control_frame_single, text="选择聊天流:").pack(side=tk.LEFT, padx=(0, 5))
|
||||
self.stream_selector = ttk.Combobox(
|
||||
self.control_frame_single, textvariable=self.selected_stream_id, state="readonly", width=50
|
||||
)
|
||||
self.stream_selector.pack(side=tk.LEFT, fill=tk.X, expand=True)
|
||||
self.stream_selector.bind("<<ComboboxSelected>>", self.on_stream_selected)
|
||||
|
||||
# --- 新增:单个流详情显示区域 ---
|
||||
self.single_stream_details_frame = ttk.Frame(self.frame_single, padding="5 5 5 0")
|
||||
self.single_stream_details_frame.pack(side=tk.TOP, fill=tk.X, pady=(0, 5))
|
||||
|
||||
ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_sub_mind).pack(side=tk.LEFT, padx=5)
|
||||
ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_chat_state).pack(
|
||||
side=tk.LEFT, padx=5
|
||||
)
|
||||
ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_threshold).pack(
|
||||
side=tk.LEFT, padx=5
|
||||
)
|
||||
ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_last_active).pack(
|
||||
side=tk.LEFT, padx=5
|
||||
)
|
||||
ttk.Label(self.single_stream_details_frame, textvariable=self.single_stream_last_interaction).pack(
|
||||
side=tk.LEFT, padx=5
|
||||
)
|
||||
|
||||
# Matplotlib 图表设置 (用于第二个选项卡)
|
||||
self.fig_single = Figure(figsize=(5, 4), dpi=100)
|
||||
# 修改:创建两个子图,一个显示兴趣度,一个显示概率
|
||||
self.ax_single_interest = self.fig_single.add_subplot(211) # 2行1列的第1个
|
||||
self.ax_single_probability = self.fig_single.add_subplot(
|
||||
212, sharex=self.ax_single_interest
|
||||
) # 2行1列的第2个,共享X轴
|
||||
|
||||
# 创建 Tkinter 画布嵌入 Matplotlib 图表 (用于第二个选项卡)
|
||||
self.canvas_single = FigureCanvasTkAgg(self.fig_single, master=self.frame_single) # <--- 放入 frame_single
|
||||
self.canvas_widget_single = self.canvas_single.get_tk_widget()
|
||||
self.canvas_widget_single.pack(side=tk.TOP, fill=tk.BOTH, expand=1)
|
||||
|
||||
# --- 初始化和启动刷新 ---
|
||||
self.update_display() # 首次加载并开始刷新循环
|
||||
|
||||
def on_stream_selected(self, event=None):
|
||||
"""当 Combobox 选择改变时调用,更新单个流的图表"""
|
||||
self.update_single_stream_plot()
|
||||
|
||||
def get_random_color(self):
|
||||
"""生成随机颜色用于区分线条"""
|
||||
return "#{:06x}".format(random.randint(0, 0xFFFFFF))
|
||||
|
||||
def load_and_update_history(self):
|
||||
"""从 history log 文件加载数据并更新历史记录"""
|
||||
if not os.path.exists(LOG_FILE_PATH):
|
||||
self.set_status(f"Error: Log file not found at {LOG_FILE_PATH}", "red")
|
||||
# 如果文件不存在,不清空现有数据,以便显示最后一次成功读取的状态
|
||||
return
|
||||
|
||||
# *** Reset display names each time we reload ***
|
||||
new_stream_history = {}
|
||||
new_stream_display_names = {}
|
||||
new_probability_history = {} # <--- 重置概率历史
|
||||
# --- 新增:重置其他子流状态 --- (如果需要的话,但通常覆盖即可)
|
||||
# self.stream_sub_minds = {}
|
||||
# self.stream_chat_states = {}
|
||||
# ... 等等 ...
|
||||
|
||||
read_count = 0
|
||||
error_count = 0
|
||||
# *** Calculate the timestamp threshold for the last 30 minutes ***
|
||||
current_time = time.time()
|
||||
time_threshold = current_time - (15 * 60) # 30 minutes in seconds
|
||||
|
||||
try:
|
||||
with open(LOG_FILE_PATH, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
read_count += 1
|
||||
try:
|
||||
log_entry = json.loads(line.strip())
|
||||
timestamp = log_entry.get("timestamp") # 获取顶层时间戳
|
||||
|
||||
# *** 时间过滤 ***
|
||||
if timestamp is None:
|
||||
error_count += 1
|
||||
continue # 跳过没有时间戳的行
|
||||
try:
|
||||
entry_timestamp = float(timestamp)
|
||||
if entry_timestamp < time_threshold:
|
||||
continue # 跳过时间过早的条目
|
||||
except (ValueError, TypeError):
|
||||
error_count += 1
|
||||
continue # 跳过时间戳格式错误的行
|
||||
|
||||
# --- 新增:更新顶层信息 (使用最后一个有效行的数据) ---
|
||||
self.latest_main_mind.set(
|
||||
log_entry.get("main_mind", self.latest_main_mind.get())
|
||||
) # 保留旧值如果缺失
|
||||
self.latest_mai_state.set(log_entry.get("mai_state", self.latest_mai_state.get()))
|
||||
self.latest_subflow_count.set(log_entry.get("subflow_count", self.latest_subflow_count.get()))
|
||||
|
||||
# --- 修改开始:迭代 subflows ---
|
||||
subflows = log_entry.get("subflows")
|
||||
if not isinstance(subflows, list): # 检查 subflows 是否存在且为列表
|
||||
error_count += 1
|
||||
continue # 跳过没有 subflows 或格式无效的行
|
||||
|
||||
for subflow_entry in subflows:
|
||||
stream_id = subflow_entry.get("stream_id")
|
||||
interest_level = subflow_entry.get("interest_level")
|
||||
# 获取 group_name,如果不存在则回退到 stream_id
|
||||
group_name = subflow_entry.get("group_name", stream_id)
|
||||
# reply_probability = subflow_entry.get("reply_probability") # 获取概率值 # <-- 注释掉旧行
|
||||
start_hfc_probability = subflow_entry.get(
|
||||
"start_hfc_probability"
|
||||
) # <-- 添加新行,读取新字段
|
||||
|
||||
# *** 检查必要的字段 ***
|
||||
# 注意:时间戳已在顶层检查过
|
||||
if stream_id is None or interest_level is None:
|
||||
# 这里可以选择记录子流错误,但暂时跳过
|
||||
continue # 跳过无效的 subflow 条目
|
||||
|
||||
# 确保 interest_level 可以转换为浮点数
|
||||
try:
|
||||
interest_level_float = float(interest_level)
|
||||
except (ValueError, TypeError):
|
||||
continue # 跳过 interest_level 无效的 subflow
|
||||
|
||||
# 如果是第一次读到这个 stream_id,则创建 deque
|
||||
if stream_id not in new_stream_history:
|
||||
new_stream_history[stream_id] = deque(maxlen=MAX_HISTORY_POINTS)
|
||||
new_probability_history[stream_id] = deque(maxlen=MAX_HISTORY_POINTS) # 创建概率 deque
|
||||
# 检查是否已有颜色,没有则分配
|
||||
if stream_id not in self.stream_colors:
|
||||
self.stream_colors[stream_id] = self.get_random_color()
|
||||
|
||||
# *** 存储此 stream_id 最新的显示名称 ***
|
||||
new_stream_display_names[stream_id] = group_name
|
||||
|
||||
# --- 新增:存储其他子流信息 ---
|
||||
self.stream_sub_minds[stream_id] = subflow_entry.get("sub_mind", "N/A")
|
||||
self.stream_chat_states[stream_id] = subflow_entry.get("sub_chat_state", "N/A")
|
||||
self.stream_threshold_status[stream_id] = subflow_entry.get("is_above_threshold", False)
|
||||
self.stream_last_active[stream_id] = subflow_entry.get(
|
||||
"chat_state_changed_time"
|
||||
) # 存储原始时间戳
|
||||
self.stream_last_interaction[stream_id] = subflow_entry.get(
|
||||
"last_interaction_time"
|
||||
) # 存储原始时间戳
|
||||
|
||||
# 添加数据点 (使用顶层时间戳)
|
||||
new_stream_history[stream_id].append((entry_timestamp, interest_level_float))
|
||||
|
||||
# 添加概率数据点 (如果存在且有效)
|
||||
# if reply_probability is not None: # <-- 注释掉旧判断
|
||||
if start_hfc_probability is not None: # <-- 修改判断条件
|
||||
try:
|
||||
# 尝试将概率转换为浮点数
|
||||
# probability_float = float(reply_probability) # <-- 注释掉旧转换
|
||||
probability_float = float(start_hfc_probability) # <-- 使用新变量
|
||||
new_probability_history[stream_id].append((entry_timestamp, probability_float))
|
||||
except (TypeError, ValueError):
|
||||
# 如果概率值无效,可以跳过或记录一个默认值,这里跳过
|
||||
pass
|
||||
# --- 修改结束 ---
|
||||
|
||||
except json.JSONDecodeError:
|
||||
error_count += 1
|
||||
# logger.warning(f"Skipping invalid JSON line: {line.strip()}")
|
||||
continue # 跳过无法解析的行
|
||||
# except (TypeError, ValueError) as e: # 这个外层 catch 可能不再需要,因为类型错误在内部处理了
|
||||
# error_count += 1
|
||||
# # logger.warning(f"Skipping line due to data type error ({e}): {line.strip()}")
|
||||
# continue # 跳过数据类型错误的行
|
||||
|
||||
# 读取完成后,用新数据替换旧数据
|
||||
self.stream_history = new_stream_history
|
||||
self.stream_display_names = new_stream_display_names # *** Update display names ***
|
||||
self.probability_history = new_probability_history # <--- 更新概率历史
|
||||
# 清理不再存在的 stream_id 的附加信息 (可选,但保持一致性)
|
||||
streams_to_remove = set(self.stream_sub_minds.keys()) - set(new_stream_history.keys())
|
||||
for sid in streams_to_remove:
|
||||
self.stream_sub_minds.pop(sid, None)
|
||||
self.stream_chat_states.pop(sid, None)
|
||||
self.stream_threshold_status.pop(sid, None)
|
||||
self.stream_last_active.pop(sid, None)
|
||||
self.stream_last_interaction.pop(sid, None)
|
||||
# 颜色和显示名称也应该清理,但当前逻辑是保留旧颜色
|
||||
# self.stream_colors.pop(sid, None)
|
||||
status_msg = f"Data loaded at {datetime.now().strftime('%H:%M:%S')}. Lines read: {read_count}."
|
||||
if error_count > 0:
|
||||
status_msg += f" Skipped {error_count} invalid lines."
|
||||
self.set_status(status_msg, "orange")
|
||||
else:
|
||||
self.set_status(status_msg, "green")
|
||||
|
||||
except IOError as e:
|
||||
self.set_status(f"Error reading file {LOG_FILE_PATH}: {e}", "red")
|
||||
except Exception as e:
|
||||
self.set_status(f"An unexpected error occurred during loading: {e}", "red")
|
||||
|
||||
# --- 更新 Combobox ---
|
||||
self.update_stream_selector()
|
||||
|
||||
def update_stream_selector(self):
|
||||
"""更新单个流选项卡中的 Combobox 列表"""
|
||||
# 创建 (display_name, stream_id) 对的列表,按 display_name 排序
|
||||
available_streams = sorted(
|
||||
[
|
||||
(name, sid)
|
||||
for sid, name in self.stream_display_names.items()
|
||||
if sid in self.stream_history and self.stream_history[sid]
|
||||
],
|
||||
key=lambda item: item[0], # 按显示名称排序
|
||||
)
|
||||
|
||||
# 更新 Combobox 的值 (仅显示 display_name)
|
||||
self.stream_selector["values"] = [name for name, sid in available_streams]
|
||||
|
||||
# 检查当前选中的 stream_id 是否仍然有效
|
||||
current_selection_name = self.selected_stream_id.get()
|
||||
current_selection_valid = any(name == current_selection_name for name, sid in available_streams)
|
||||
|
||||
if not current_selection_valid and available_streams:
|
||||
# 如果当前选择无效,并且有可选流,则默认选中第一个
|
||||
self.selected_stream_id.set(available_streams[0][0])
|
||||
# 手动触发一次更新,因为 set 不会触发 <<ComboboxSelected>>
|
||||
self.update_single_stream_plot()
|
||||
elif not available_streams:
|
||||
# 如果没有可选流,清空选择
|
||||
self.selected_stream_id.set("")
|
||||
self.update_single_stream_plot() # 清空图表
|
||||
|
||||
def update_all_streams_plot(self):
|
||||
"""更新第一个选项卡的 Matplotlib 图表 (显示所有流)"""
|
||||
self.ax.clear() # 清除旧图
|
||||
# *** 设置中文标题和标签 ***
|
||||
self.ax.set_title("兴趣度随时间变化图 (所有活跃流)")
|
||||
self.ax.set_xlabel("时间")
|
||||
self.ax.set_ylabel("兴趣度")
|
||||
self.ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S"))
|
||||
self.ax.grid(True)
|
||||
self.ax.set_ylim(0, 10) # 固定 Y 轴范围 0-10
|
||||
|
||||
# 只绘制最新的 N 个 stream (按最后记录的兴趣度排序)
|
||||
# 注意:现在是基于文件读取的快照排序,可能不是实时最新
|
||||
active_streams = sorted(
|
||||
self.stream_history.items(),
|
||||
key=lambda item: item[1][-1][1] if item[1] else 0, # 按最后兴趣度排序
|
||||
reverse=True,
|
||||
)[:MAX_STREAMS_TO_DISPLAY]
|
||||
|
||||
all_times = [] # 用于确定 X 轴范围
|
||||
|
||||
for stream_id, history in active_streams:
|
||||
if not history:
|
||||
continue
|
||||
|
||||
timestamps, interests = zip(*history)
|
||||
# 将 time.time() 时间戳转换为 matplotlib 可识别的日期格式
|
||||
try:
|
||||
mpl_dates = [datetime.fromtimestamp(ts) for ts in timestamps]
|
||||
all_times.extend(mpl_dates) # 收集所有时间点
|
||||
|
||||
# *** Use display name for label ***
|
||||
display_label = self.stream_display_names.get(stream_id, stream_id)
|
||||
|
||||
self.ax.plot(
|
||||
mpl_dates,
|
||||
interests,
|
||||
label=display_label, # *** Use display_label ***
|
||||
color=self.stream_colors.get(stream_id, "grey"),
|
||||
marker=".",
|
||||
markersize=3,
|
||||
linestyle="-",
|
||||
linewidth=1,
|
||||
)
|
||||
except ValueError as e:
|
||||
print(f"Skipping plot for {stream_id} due to invalid timestamp: {e}")
|
||||
continue
|
||||
|
||||
if all_times:
|
||||
# 根据数据动态调整 X 轴范围,留一点边距
|
||||
min_time = min(all_times)
|
||||
max_time = max(all_times)
|
||||
# delta = max_time - min_time
|
||||
# self.ax.set_xlim(min_time - delta * 0.05, max_time + delta * 0.05)
|
||||
self.ax.set_xlim(min_time, max_time)
|
||||
|
||||
# 自动格式化X轴标签
|
||||
self.fig.autofmt_xdate()
|
||||
else:
|
||||
# 如果没有数据,设置一个默认的时间范围,例如最近一小时
|
||||
now = datetime.now()
|
||||
one_hour_ago = now - timedelta(hours=1)
|
||||
self.ax.set_xlim(one_hour_ago, now)
|
||||
|
||||
# 添加图例
|
||||
if active_streams:
|
||||
# 调整图例位置和大小
|
||||
# 字体已通过全局 matplotlib.rcParams 设置
|
||||
self.ax.legend(loc="upper left", bbox_to_anchor=(1.02, 1), borderaxespad=0.0, fontsize="x-small")
|
||||
# 调整布局,确保图例不被裁剪
|
||||
self.fig.tight_layout(rect=[0, 0, 0.85, 1]) # 右侧留出空间给图例
|
||||
|
||||
self.canvas.draw() # 重绘画布
|
||||
|
||||
def update_single_stream_plot(self):
|
||||
"""更新第二个选项卡的 Matplotlib 图表 (显示单个选定的流)"""
|
||||
self.ax_single_interest.clear()
|
||||
self.ax_single_probability.clear()
|
||||
|
||||
# 设置子图标题和标签
|
||||
self.ax_single_interest.set_title("兴趣度")
|
||||
self.ax_single_interest.set_ylim(0, 10) # 固定 Y 轴范围 0-10
|
||||
|
||||
# self.ax_single_probability.set_title("回复评估概率") # <-- 注释掉旧标题
|
||||
self.ax_single_probability.set_title("HFC 启动概率") # <-- 修改标题
|
||||
self.ax_single_probability.set_xlabel("时间")
|
||||
# self.ax_single_probability.set_ylabel("概率") # <-- 注释掉旧标签
|
||||
self.ax_single_probability.set_ylabel("HFC 概率") # <-- 修改 Y 轴标签
|
||||
self.ax_single_probability.grid(True)
|
||||
self.ax_single_probability.set_ylim(0, 1.05) # 固定 Y 轴范围 0-1
|
||||
self.ax_single_probability.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S"))
|
||||
|
||||
selected_name = self.selected_stream_id.get()
|
||||
selected_sid = None
|
||||
|
||||
# --- 新增:根据选中的名称找到 stream_id ---
|
||||
if selected_name:
|
||||
for sid, name in self.stream_display_names.items():
|
||||
if name == selected_name:
|
||||
selected_sid = sid
|
||||
break
|
||||
|
||||
all_times = [] # 用于确定 X 轴范围
|
||||
|
||||
# --- 新增:绘制兴趣度图 ---
|
||||
if selected_sid and selected_sid in self.stream_history and self.stream_history[selected_sid]:
|
||||
history = self.stream_history[selected_sid]
|
||||
timestamps, interests = zip(*history)
|
||||
try:
|
||||
mpl_dates = [datetime.fromtimestamp(ts) for ts in timestamps]
|
||||
all_times.extend(mpl_dates)
|
||||
self.ax_single_interest.plot(
|
||||
mpl_dates,
|
||||
interests,
|
||||
color=self.stream_colors.get(selected_sid, "blue"),
|
||||
marker=".",
|
||||
markersize=3,
|
||||
linestyle="-",
|
||||
linewidth=1,
|
||||
)
|
||||
except ValueError as e:
|
||||
print(f"Skipping interest plot for {selected_sid} due to invalid timestamp: {e}")
|
||||
|
||||
# --- 新增:绘制概率图 ---
|
||||
if selected_sid and selected_sid in self.probability_history and self.probability_history[selected_sid]:
|
||||
prob_history = self.probability_history[selected_sid]
|
||||
prob_timestamps, probabilities = zip(*prob_history)
|
||||
try:
|
||||
prob_mpl_dates = [datetime.fromtimestamp(ts) for ts in prob_timestamps]
|
||||
# 注意:概率图的时间点可能与兴趣度不同,也需要加入 all_times
|
||||
all_times.extend(prob_mpl_dates)
|
||||
self.ax_single_probability.plot(
|
||||
prob_mpl_dates,
|
||||
probabilities,
|
||||
color=self.stream_colors.get(selected_sid, "green"), # 可以用不同颜色
|
||||
marker=".",
|
||||
markersize=3,
|
||||
linestyle="-",
|
||||
linewidth=1,
|
||||
)
|
||||
except ValueError as e:
|
||||
print(f"Skipping probability plot for {selected_sid} due to invalid timestamp: {e}")
|
||||
|
||||
# --- 新增:调整 X 轴范围和格式 ---
|
||||
if all_times:
|
||||
min_time = min(all_times)
|
||||
max_time = max(all_times)
|
||||
# 设置共享的 X 轴范围
|
||||
self.ax_single_interest.set_xlim(min_time, max_time)
|
||||
# self.ax_single_probability.set_xlim(min_time, max_time) # sharex 会自动同步
|
||||
# 自动格式化X轴标签 (应用到共享轴的最后一个子图上通常即可)
|
||||
self.fig_single.autofmt_xdate()
|
||||
else:
|
||||
# 如果没有数据,设置一个默认的时间范围
|
||||
now = datetime.now()
|
||||
one_hour_ago = now - timedelta(hours=1)
|
||||
self.ax_single_interest.set_xlim(one_hour_ago, now)
|
||||
# self.ax_single_probability.set_xlim(one_hour_ago, now) # sharex 会自动同步
|
||||
|
||||
# --- 新增:更新单个流的详细信息标签 ---
|
||||
self.update_single_stream_details(selected_sid)
|
||||
|
||||
# --- 新增:重新绘制画布 ---
|
||||
self.canvas_single.draw()
|
||||
|
||||
def format_timestamp(self, ts):
|
||||
"""辅助函数:格式化时间戳,处理 None 或无效值"""
|
||||
if ts is None:
|
||||
return "N/A"
|
||||
try:
|
||||
# 假设 ts 是 float 类型的时间戳
|
||||
dt_object = datetime.fromtimestamp(float(ts))
|
||||
return dt_object.strftime("%Y-%m-%d %H:%M:%S")
|
||||
except (ValueError, TypeError):
|
||||
return "Invalid Time"
|
||||
|
||||
def update_single_stream_details(self, stream_id):
|
||||
"""更新单个流详情区域的标签内容"""
|
||||
if stream_id:
|
||||
sub_mind = self.stream_sub_minds.get(stream_id, "N/A")
|
||||
chat_state = self.stream_chat_states.get(stream_id, "N/A")
|
||||
threshold = self.stream_threshold_status.get(stream_id, False)
|
||||
last_active_ts = self.stream_last_active.get(stream_id)
|
||||
last_interaction_ts = self.stream_last_interaction.get(stream_id)
|
||||
|
||||
self.single_stream_sub_mind.set(f"想法: {sub_mind}")
|
||||
self.single_stream_chat_state.set(f"状态: {chat_state}")
|
||||
self.single_stream_threshold.set(f"阈值以上: {'是' if threshold else '否'}")
|
||||
self.single_stream_last_active.set(f"最后活跃: {self.format_timestamp(last_active_ts)}")
|
||||
self.single_stream_last_interaction.set(f"最后交互: {self.format_timestamp(last_interaction_ts)}")
|
||||
else:
|
||||
# 如果没有选择流,则清空详情
|
||||
self.single_stream_sub_mind.set("想法: N/A")
|
||||
self.single_stream_chat_state.set("状态: N/A")
|
||||
self.single_stream_threshold.set("阈值: N/A")
|
||||
self.single_stream_last_active.set("活跃: N/A")
|
||||
self.single_stream_last_interaction.set("交互: N/A")
|
||||
|
||||
def update_display(self):
|
||||
"""主更新循环"""
|
||||
try:
|
||||
self.load_and_update_history() # 从文件加载数据并更新内部状态
|
||||
# *** 修改:分别调用两个图表的更新方法 ***
|
||||
self.update_all_streams_plot() # 更新所有流的图表
|
||||
self.update_single_stream_plot() # 更新单个流的图表
|
||||
except Exception as e:
|
||||
# 提供更详细的错误信息
|
||||
import traceback
|
||||
|
||||
error_msg = f"Error during update: {e}\n{traceback.format_exc()}"
|
||||
self.set_status(error_msg, "red")
|
||||
print(error_msg) # 打印详细错误到控制台
|
||||
|
||||
# 安排下一次刷新
|
||||
self.root.after(REFRESH_INTERVAL_MS, self.update_display)
|
||||
|
||||
def set_status(self, message: str, color: str = "grey"):
|
||||
"""更新状态栏标签"""
|
||||
# 限制状态栏消息长度
|
||||
max_len = 150
|
||||
display_message = (message[:max_len] + "...") if len(message) > max_len else message
|
||||
self.status_label.config(text=display_message, fg=color)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 导入 timedelta 用于默认时间范围
|
||||
from datetime import timedelta
|
||||
|
||||
root = tk.Tk()
|
||||
app = InterestMonitorApp(root)
|
||||
root.mainloop()
|
||||
92
scripts/raw_data_preprocessor.py
Normal file
92
scripts/raw_data_preprocessor.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
import sys # 新增系统模块导入
|
||||
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
from src.common.logger import get_module_logger
|
||||
|
||||
logger = get_module_logger("LPMM数据库-原始数据处理")
|
||||
|
||||
# 添加项目根目录到 sys.path
|
||||
|
||||
|
||||
def check_and_create_dirs():
|
||||
"""检查并创建必要的目录"""
|
||||
required_dirs = ["data/lpmm_raw_data", "data/imported_lpmm_data"]
|
||||
|
||||
for dir_path in required_dirs:
|
||||
if not os.path.exists(dir_path):
|
||||
os.makedirs(dir_path)
|
||||
logger.info(f"已创建目录: {dir_path}")
|
||||
|
||||
|
||||
def process_text_file(file_path):
|
||||
"""处理单个文本文件,返回段落列表"""
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
raw = f.read()
|
||||
|
||||
paragraphs = []
|
||||
paragraph = ""
|
||||
for line in raw.split("\n"):
|
||||
if line.strip() == "":
|
||||
if paragraph != "":
|
||||
paragraphs.append(paragraph.strip())
|
||||
paragraph = ""
|
||||
else:
|
||||
paragraph += line + "\n"
|
||||
|
||||
if paragraph != "":
|
||||
paragraphs.append(paragraph.strip())
|
||||
|
||||
return paragraphs
|
||||
|
||||
|
||||
def main():
|
||||
# 新增用户确认提示
|
||||
print("=== 重要操作确认 ===")
|
||||
print("如果你并非第一次导入知识")
|
||||
print("请先删除data/import.json文件,备份data/openie.json文件")
|
||||
print("在进行知识库导入之前")
|
||||
print("请修改config/lpmm_config.toml中的配置项")
|
||||
confirm = input("确认继续执行?(y/n): ").strip().lower()
|
||||
if confirm != "y":
|
||||
logger.error("操作已取消")
|
||||
sys.exit(1)
|
||||
print("\n" + "=" * 40 + "\n")
|
||||
|
||||
# 检查并创建必要的目录
|
||||
check_and_create_dirs()
|
||||
|
||||
# 检查输出文件是否存在
|
||||
if os.path.exists("data/import.json"):
|
||||
logger.error("错误: data/import.json 已存在,请先处理或删除该文件")
|
||||
sys.exit(1)
|
||||
|
||||
if os.path.exists("data/openie.json"):
|
||||
logger.error("错误: data/openie.json 已存在,请先处理或删除该文件")
|
||||
sys.exit(1)
|
||||
|
||||
# 获取所有原始文本文件
|
||||
raw_files = list(Path("data/lpmm_raw_data").glob("*.txt"))
|
||||
if not raw_files:
|
||||
logger.warning("警告: data/lpmm_raw_data 中没有找到任何 .txt 文件")
|
||||
sys.exit(1)
|
||||
|
||||
# 处理所有文件
|
||||
all_paragraphs = []
|
||||
for file in raw_files:
|
||||
logger.info(f"正在处理文件: {file.name}")
|
||||
paragraphs = process_text_file(file)
|
||||
all_paragraphs.extend(paragraphs)
|
||||
|
||||
# 保存合并后的结果
|
||||
output_path = "data/import.json"
|
||||
with open(output_path, "w", encoding="utf-8") as f:
|
||||
json.dump(all_paragraphs, f, ensure_ascii=False, indent=4)
|
||||
|
||||
logger.info(f"处理完成,结果已保存到: {output_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user