通过以下改动修复嵌入生成过程中的事件循环相关问题: - 在 EmbeddingStore._get_embedding 中,改为同步创建-使用-销毁的新事件循环模式,彻底避免嵌套事件循环问题 - 调整批量嵌入 _get_embeddings_batch_threaded,确保每个线程使用独立、短生命周期的事件循环 - 新增 force_new 参数,LLM 请求嵌入任务时强制创建新的客户端实例,减少跨循环对象复用 - 在 OpenAI 客户端的 embedding 调用处补充详细日志,方便排查网络连接异常 - get_embedding() 每次都重建 LLMRequest,降低实例在多个事件循环中穿梭的概率 此次改动虽然以同步风格“硬掰”异步接口,但对现有接口零破坏,确保了向量数据库及相关知识检索功能的稳定性。(还有就是把的脚本文件夹移回来了)
394 lines
13 KiB
Python
394 lines
13 KiB
Python
import time
|
||
import sys
|
||
import os
|
||
import re
|
||
from typing import Dict, List, Tuple, Optional
|
||
from datetime import datetime
|
||
# Add project root to Python path
|
||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
sys.path.insert(0, project_root)
|
||
from src.common.database.database_model import Messages, ChatStreams #noqa
|
||
|
||
|
||
def contains_emoji_or_image_tags(text: str) -> bool:
|
||
"""Check if text contains [表情包xxxxx] or [图片xxxxx] tags"""
|
||
if not text:
|
||
return False
|
||
|
||
# 检查是否包含 [表情包] 或 [图片] 标记
|
||
emoji_pattern = r'\[表情包[^\]]*\]'
|
||
image_pattern = r'\[图片[^\]]*\]'
|
||
|
||
return bool(re.search(emoji_pattern, text) or re.search(image_pattern, text))
|
||
|
||
|
||
def clean_reply_text(text: str) -> str:
|
||
"""Remove reply references like [回复 xxxx...] from text"""
|
||
if not text:
|
||
return text
|
||
|
||
# 匹配 [回复 xxxx...] 格式的内容
|
||
# 使用非贪婪匹配,匹配到第一个 ] 就停止
|
||
cleaned_text = re.sub(r'\[回复[^\]]*\]', '', text)
|
||
|
||
# 去除多余的空白字符
|
||
cleaned_text = cleaned_text.strip()
|
||
|
||
return cleaned_text
|
||
|
||
|
||
def get_chat_name(chat_id: str) -> str:
|
||
"""Get chat name from chat_id by querying ChatStreams table directly"""
|
||
try:
|
||
chat_stream = ChatStreams.get_or_none(ChatStreams.stream_id == chat_id)
|
||
if chat_stream is None:
|
||
return f"未知聊天 ({chat_id})"
|
||
|
||
if chat_stream.group_name:
|
||
return f"{chat_stream.group_name} ({chat_id})"
|
||
elif chat_stream.user_nickname:
|
||
return f"{chat_stream.user_nickname}的私聊 ({chat_id})"
|
||
else:
|
||
return f"未知聊天 ({chat_id})"
|
||
except Exception:
|
||
return f"查询失败 ({chat_id})"
|
||
|
||
|
||
def format_timestamp(timestamp: float) -> str:
|
||
"""Format timestamp to readable date string"""
|
||
try:
|
||
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
|
||
except (ValueError, OSError):
|
||
return "未知时间"
|
||
|
||
|
||
def calculate_text_length_distribution(messages) -> Dict[str, int]:
|
||
"""Calculate distribution of processed_plain_text length"""
|
||
distribution = {
|
||
'0': 0, # 空文本
|
||
'1-5': 0, # 极短文本
|
||
'6-10': 0, # 很短文本
|
||
'11-20': 0, # 短文本
|
||
'21-30': 0, # 较短文本
|
||
'31-50': 0, # 中短文本
|
||
'51-70': 0, # 中等文本
|
||
'71-100': 0, # 较长文本
|
||
'101-150': 0, # 长文本
|
||
'151-200': 0, # 很长文本
|
||
'201-300': 0, # 超长文本
|
||
'301-500': 0, # 极长文本
|
||
'501-1000': 0, # 巨长文本
|
||
'1000+': 0 # 超巨长文本
|
||
}
|
||
|
||
for msg in messages:
|
||
if msg.processed_plain_text is None:
|
||
continue
|
||
|
||
# 排除包含表情包或图片标记的消息
|
||
if contains_emoji_or_image_tags(msg.processed_plain_text):
|
||
continue
|
||
|
||
# 清理文本中的回复引用
|
||
cleaned_text = clean_reply_text(msg.processed_plain_text)
|
||
length = len(cleaned_text)
|
||
|
||
if length == 0:
|
||
distribution['0'] += 1
|
||
elif length <= 5:
|
||
distribution['1-5'] += 1
|
||
elif length <= 10:
|
||
distribution['6-10'] += 1
|
||
elif length <= 20:
|
||
distribution['11-20'] += 1
|
||
elif length <= 30:
|
||
distribution['21-30'] += 1
|
||
elif length <= 50:
|
||
distribution['31-50'] += 1
|
||
elif length <= 70:
|
||
distribution['51-70'] += 1
|
||
elif length <= 100:
|
||
distribution['71-100'] += 1
|
||
elif length <= 150:
|
||
distribution['101-150'] += 1
|
||
elif length <= 200:
|
||
distribution['151-200'] += 1
|
||
elif length <= 300:
|
||
distribution['201-300'] += 1
|
||
elif length <= 500:
|
||
distribution['301-500'] += 1
|
||
elif length <= 1000:
|
||
distribution['501-1000'] += 1
|
||
else:
|
||
distribution['1000+'] += 1
|
||
|
||
return distribution
|
||
|
||
|
||
def get_text_length_stats(messages) -> Dict[str, float]:
|
||
"""Calculate basic statistics for processed_plain_text length"""
|
||
lengths = []
|
||
null_count = 0
|
||
excluded_count = 0 # 被排除的消息数量
|
||
|
||
for msg in messages:
|
||
if msg.processed_plain_text is None:
|
||
null_count += 1
|
||
elif contains_emoji_or_image_tags(msg.processed_plain_text):
|
||
# 排除包含表情包或图片标记的消息
|
||
excluded_count += 1
|
||
else:
|
||
# 清理文本中的回复引用
|
||
cleaned_text = clean_reply_text(msg.processed_plain_text)
|
||
lengths.append(len(cleaned_text))
|
||
|
||
if not lengths:
|
||
return {
|
||
'count': 0,
|
||
'null_count': null_count,
|
||
'excluded_count': excluded_count,
|
||
'min': 0,
|
||
'max': 0,
|
||
'avg': 0,
|
||
'median': 0
|
||
}
|
||
|
||
lengths.sort()
|
||
count = len(lengths)
|
||
|
||
return {
|
||
'count': count,
|
||
'null_count': null_count,
|
||
'excluded_count': excluded_count,
|
||
'min': min(lengths),
|
||
'max': max(lengths),
|
||
'avg': sum(lengths) / count,
|
||
'median': lengths[count // 2] if count % 2 == 1 else (lengths[count // 2 - 1] + lengths[count // 2]) / 2
|
||
}
|
||
|
||
|
||
def get_available_chats() -> List[Tuple[str, str, int]]:
|
||
"""Get all available chats with message counts"""
|
||
try:
|
||
# 获取所有有消息的chat_id,排除特殊类型消息
|
||
chat_counts = {}
|
||
for msg in Messages.select(Messages.chat_id).distinct():
|
||
chat_id = msg.chat_id
|
||
count = Messages.select().where(
|
||
(Messages.chat_id == chat_id) &
|
||
(Messages.is_emoji != 1) &
|
||
(Messages.is_picid != 1) &
|
||
(Messages.is_command != 1)
|
||
).count()
|
||
if count > 0:
|
||
chat_counts[chat_id] = count
|
||
|
||
# 获取聊天名称
|
||
result = []
|
||
for chat_id, count in chat_counts.items():
|
||
chat_name = get_chat_name(chat_id)
|
||
result.append((chat_id, chat_name, count))
|
||
|
||
# 按消息数量排序
|
||
result.sort(key=lambda x: x[2], reverse=True)
|
||
return result
|
||
except Exception as e:
|
||
print(f"获取聊天列表失败: {e}")
|
||
return []
|
||
|
||
|
||
def get_time_range_input() -> Tuple[Optional[float], Optional[float]]:
|
||
"""Get time range input from user"""
|
||
print("\n时间范围选择:")
|
||
print("1. 最近1天")
|
||
print("2. 最近3天")
|
||
print("3. 最近7天")
|
||
print("4. 最近30天")
|
||
print("5. 自定义时间范围")
|
||
print("6. 不限制时间")
|
||
|
||
choice = input("请选择时间范围 (1-6): ").strip()
|
||
|
||
now = time.time()
|
||
|
||
if choice == "1":
|
||
return now - 24*3600, now
|
||
elif choice == "2":
|
||
return now - 3*24*3600, now
|
||
elif choice == "3":
|
||
return now - 7*24*3600, now
|
||
elif choice == "4":
|
||
return now - 30*24*3600, now
|
||
elif choice == "5":
|
||
print("请输入开始时间 (格式: YYYY-MM-DD HH:MM:SS):")
|
||
start_str = input().strip()
|
||
print("请输入结束时间 (格式: YYYY-MM-DD HH:MM:SS):")
|
||
end_str = input().strip()
|
||
|
||
try:
|
||
start_time = datetime.strptime(start_str, "%Y-%m-%d %H:%M:%S").timestamp()
|
||
end_time = datetime.strptime(end_str, "%Y-%m-%d %H:%M:%S").timestamp()
|
||
return start_time, end_time
|
||
except ValueError:
|
||
print("时间格式错误,将不限制时间范围")
|
||
return None, None
|
||
else:
|
||
return None, None
|
||
|
||
|
||
def get_top_longest_messages(messages, top_n: int = 10) -> List[Tuple[str, int, str, str]]:
|
||
"""Get top N longest messages"""
|
||
message_lengths = []
|
||
|
||
for msg in messages:
|
||
if msg.processed_plain_text is not None:
|
||
# 排除包含表情包或图片标记的消息
|
||
if contains_emoji_or_image_tags(msg.processed_plain_text):
|
||
continue
|
||
|
||
# 清理文本中的回复引用
|
||
cleaned_text = clean_reply_text(msg.processed_plain_text)
|
||
length = len(cleaned_text)
|
||
chat_name = get_chat_name(msg.chat_id)
|
||
time_str = format_timestamp(msg.time)
|
||
# 截取前100个字符作为预览
|
||
preview = cleaned_text[:100] + "..." if len(cleaned_text) > 100 else cleaned_text
|
||
message_lengths.append((chat_name, length, time_str, preview))
|
||
|
||
# 按长度排序,取前N个
|
||
message_lengths.sort(key=lambda x: x[1], reverse=True)
|
||
return message_lengths[:top_n]
|
||
|
||
|
||
def analyze_text_lengths(chat_id: Optional[str] = None, start_time: Optional[float] = None, end_time: Optional[float] = None) -> None:
|
||
"""Analyze processed_plain_text lengths with optional filters"""
|
||
|
||
# 构建查询条件,排除特殊类型的消息
|
||
query = Messages.select().where(
|
||
(Messages.is_emoji != 1) &
|
||
(Messages.is_picid != 1) &
|
||
(Messages.is_command != 1)
|
||
)
|
||
|
||
if chat_id:
|
||
query = query.where(Messages.chat_id == chat_id)
|
||
|
||
if start_time:
|
||
query = query.where(Messages.time >= start_time)
|
||
|
||
if end_time:
|
||
query = query.where(Messages.time <= end_time)
|
||
|
||
messages = list(query)
|
||
|
||
if not messages:
|
||
print("没有找到符合条件的消息")
|
||
return
|
||
|
||
# 计算统计信息
|
||
distribution = calculate_text_length_distribution(messages)
|
||
stats = get_text_length_stats(messages)
|
||
top_longest = get_top_longest_messages(messages, 10)
|
||
|
||
# 显示结果
|
||
print("\n=== Processed Plain Text 长度分析结果 ===")
|
||
print("(已排除表情、图片ID、命令类型消息,已排除[表情包]和[图片]标记消息,已清理回复引用)")
|
||
if chat_id:
|
||
print(f"聊天: {get_chat_name(chat_id)}")
|
||
else:
|
||
print("聊天: 全部聊天")
|
||
|
||
if start_time and end_time:
|
||
print(f"时间范围: {format_timestamp(start_time)} 到 {format_timestamp(end_time)}")
|
||
elif start_time:
|
||
print(f"时间范围: {format_timestamp(start_time)} 之后")
|
||
elif end_time:
|
||
print(f"时间范围: {format_timestamp(end_time)} 之前")
|
||
else:
|
||
print("时间范围: 不限制")
|
||
|
||
print("\n基本统计:")
|
||
print(f"总消息数量: {len(messages)}")
|
||
print(f"有文本消息数量: {stats['count']}")
|
||
print(f"空文本消息数量: {stats['null_count']}")
|
||
print(f"被排除的消息数量: {stats['excluded_count']}")
|
||
if stats['count'] > 0:
|
||
print(f"最短长度: {stats['min']} 字符")
|
||
print(f"最长长度: {stats['max']} 字符")
|
||
print(f"平均长度: {stats['avg']:.2f} 字符")
|
||
print(f"中位数长度: {stats['median']:.2f} 字符")
|
||
|
||
print("\n文本长度分布:")
|
||
total = stats['count']
|
||
if total > 0:
|
||
for range_name, count in distribution.items():
|
||
if count > 0:
|
||
percentage = count / total * 100
|
||
print(f"{range_name} 字符: {count} ({percentage:.2f}%)")
|
||
|
||
# 显示最长的消息
|
||
if top_longest:
|
||
print(f"\n最长的 {len(top_longest)} 条消息:")
|
||
for i, (chat_name, length, time_str, preview) in enumerate(top_longest, 1):
|
||
print(f"{i}. [{chat_name}] {time_str}")
|
||
print(f" 长度: {length} 字符")
|
||
print(f" 预览: {preview}")
|
||
print()
|
||
|
||
|
||
def interactive_menu() -> None:
|
||
"""Interactive menu for text length analysis"""
|
||
|
||
while True:
|
||
print("\n" + "="*50)
|
||
print("Processed Plain Text 长度分析工具")
|
||
print("="*50)
|
||
print("1. 分析全部聊天")
|
||
print("2. 选择特定聊天分析")
|
||
print("q. 退出")
|
||
|
||
choice = input("\n请选择分析模式 (1-2, q): ").strip()
|
||
|
||
if choice.lower() == 'q':
|
||
print("再见!")
|
||
break
|
||
|
||
chat_id = None
|
||
|
||
if choice == "2":
|
||
# 显示可用的聊天列表
|
||
chats = get_available_chats()
|
||
if not chats:
|
||
print("没有找到聊天数据")
|
||
continue
|
||
|
||
print(f"\n可用的聊天 (共{len(chats)}个):")
|
||
for i, (_cid, name, count) in enumerate(chats, 1):
|
||
print(f"{i}. {name} ({count}条消息)")
|
||
|
||
try:
|
||
chat_choice = int(input(f"\n请选择聊天 (1-{len(chats)}): ").strip())
|
||
if 1 <= chat_choice <= len(chats):
|
||
chat_id = chats[chat_choice - 1][0]
|
||
else:
|
||
print("无效选择")
|
||
continue
|
||
except ValueError:
|
||
print("请输入有效数字")
|
||
continue
|
||
|
||
elif choice != "1":
|
||
print("无效选择")
|
||
continue
|
||
|
||
# 获取时间范围
|
||
start_time, end_time = get_time_range_input()
|
||
|
||
# 执行分析
|
||
analyze_text_lengths(chat_id, start_time, end_time)
|
||
|
||
input("\n按回车键继续...")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
interactive_menu() |