better:统一化聊天记录获取和格式化接口,修改分割函数

This commit is contained in:
SengokuCola
2025-04-20 23:39:06 +08:00
parent 5c26c40cb4
commit 4888ab65a7
19 changed files with 851 additions and 1591 deletions

View File

@@ -21,6 +21,11 @@ from ...common.database import db
logger = get_module_logger("chat_utils")
def is_english_letter(char: str) -> bool:
"""检查字符是否为英文字母(忽略大小写)"""
return 'a' <= char.lower() <= 'z'
def db_message_to_str(message_dict: Dict) -> str:
logger.debug(f"message_dict: {message_dict}")
time_str = time.strftime("%m-%d %H:%M:%S", time.localtime(message_dict["time"]))
@@ -71,7 +76,7 @@ def is_mentioned_bot_in_message(message: MessageRecv) -> tuple[bool, float]:
else:
if not is_mentioned:
# 判断是否被回复
if re.match(f"回复[\s\S]*?\({global_config.BOT_QQ}\)的消息,说:", message.processed_plain_text):
if re.match("回复[\s\S]*?\((\d+)\)的消息,说:", message.processed_plain_text):
is_mentioned = True
# 判断内容中是否被提及
@@ -217,97 +222,114 @@ def get_recent_group_speaker(chat_stream_id: int, sender, limit: int = 12) -> li
def split_into_sentences_w_remove_punctuation(text: str) -> List[str]:
"""将文本分割成句子,但保持书名号中的内容完整
"""将文本分割成句子,并根据概率合并
1. 识别分割点(, 。 ; 空格),但如果分割点左右都是英文字母则不分割。
2. 将文本分割成 (内容, 分隔符) 的元组。
3. 根据原始文本长度计算合并概率,概率性地合并相邻段落。
注意:此函数假定颜文字已在上层被保护。
Args:
text: 要分割的文本字符串
text: 要分割的文本字符串 (假定颜文字已被保护)
Returns:
List[str]: 分割后的句子列表
List[str]: 分割和合并后的句子列表
"""
# 处理两个汉字中间的换行符
text = re.sub(r'([\u4e00-\u9fff])\n([\u4e00-\u9fff])', r'\1。\2', text)
len_text = len(text)
if len_text < 4:
if len_text < 3:
if random.random() < 0.01:
return list(text) # 如果文本很短且触发随机条件,直接按字符分割
else:
return [text]
# 定义分隔符
separators = {'', ',', ' ', '', ';'}
segments = []
current_segment = ""
# 1. 分割成 (内容, 分隔符) 元组
i = 0
while i < len(text):
char = text[i]
if char in separators:
# 检查分割条件:如果分隔符左右都是英文字母,则不分割
can_split = True
if i > 0 and i < len(text) - 1:
prev_char = text[i-1]
next_char = text[i+1]
# if is_english_letter(prev_char) and is_english_letter(next_char) and char == ' ': # 原计划只对空格应用此规则,现应用于所有分隔符
if is_english_letter(prev_char) and is_english_letter(next_char):
can_split = False
if can_split:
# 只有当当前段不为空时才添加
if current_segment:
segments.append((current_segment, char))
# 如果当前段为空,但分隔符是空格,则也添加一个空段(保留空格)
elif char == ' ':
segments.append(("", char))
current_segment = ""
else:
# 不分割,将分隔符加入当前段
current_segment += char
else:
current_segment += char
i += 1
# 添加最后一个段(没有后续分隔符)
if current_segment:
segments.append((current_segment, ""))
# 过滤掉完全空的段(内容和分隔符都为空)
segments = [(content, sep) for content, sep in segments if content or sep]
# 如果分割后为空(例如,输入全是分隔符且不满足保留条件),恢复颜文字并返回
if not segments:
# recovered_text = recover_kaomoji([text], mapping) # 恢复原文本中的颜文字 - 已移至上层处理
# return [s for s in recovered_text if s] # 返回非空结果
return [text] if text else [] # 如果原始文本非空,则返回原始文本(可能只包含未被分割的字符或颜文字占位符)
# 2. 概率合并
if len_text < 12:
split_strength = 0.2
elif len_text < 32:
split_strength = 0.6
else:
split_strength = 0.7
# 合并概率与分割强度相反
merge_probability = 1.0 - split_strength
# 检查是否为西文字符段落
if not is_western_paragraph(text):
# 当语言为中文时,统一将英文逗号转换为中文逗号
text = text.replace(",", "")
text = text.replace("\n", " ")
else:
# 用"|seg|"作为分割符分开
text = re.sub(r"([.!?]) +", r"\1\|seg\|", text)
text = text.replace("\n", "|seg|")
text, mapping = protect_kaomoji(text)
# print(f"处理前的文本: {text}")
merged_segments = []
idx = 0
while idx < len(segments):
current_content, current_sep = segments[idx]
text_no_1 = ""
for letter in text:
# print(f"当前字符: {letter}")
if letter in ["!", "", "?", ""]:
# print(f"当前字符: {letter}, 随机数: {random.random()}")
if random.random() < split_strength:
letter = ""
if letter in ["", ""]:
# print(f"当前字符: {letter}, 随机数: {random.random()}")
if random.random() < 1 - split_strength:
letter = ""
text_no_1 += letter
# 检查是否可以与下一段合并
# 条件:不是最后一段,且随机数小于合并概率,且当前段有内容(避免合并空段)
if idx + 1 < len(segments) and random.random() < merge_probability and current_content:
next_content, next_sep = segments[idx+1]
# 合并: (内容1 + 分隔符1 + 内容2, 分隔符2)
# 只有当下一段也有内容时才合并文本,否则只传递分隔符
if next_content:
merged_content = current_content + current_sep + next_content
merged_segments.append((merged_content, next_sep))
else: # 下一段内容为空,只保留当前内容和下一段的分隔符
merged_segments.append((current_content, next_sep))
# 对每个逗号单独判断是否分割
sentences = [text_no_1]
new_sentences = []
for sentence in sentences:
parts = sentence.split("")
current_sentence = parts[0]
if not is_western_paragraph(current_sentence):
for part in parts[1:]:
if random.random() < split_strength:
new_sentences.append(current_sentence.strip())
current_sentence = part
else:
current_sentence += "" + part
# 处理空格分割
space_parts = current_sentence.split(" ")
current_sentence = space_parts[0]
for part in space_parts[1:]:
if random.random() < split_strength:
new_sentences.append(current_sentence.strip())
current_sentence = part
else:
current_sentence += " " + part
idx += 2 # 跳过下一段,因为它已被合并
else:
# 处理分割符
space_parts = current_sentence.split("|seg|")
current_sentence = space_parts[0]
for part in space_parts[1:]:
new_sentences.append(current_sentence.strip())
current_sentence = part
new_sentences.append(current_sentence.strip())
sentences = [s for s in new_sentences if s] # 移除空字符串
sentences = recover_kaomoji(sentences, mapping)
# 不合并,直接添加当前段
merged_segments.append((current_content, current_sep))
idx += 1
# print(f"分割后的句子: {sentences}")
sentences_done = []
for sentence in sentences:
sentence = sentence.rstrip(",")
# 西文字符句子不进行随机合并
if not is_western_paragraph(current_sentence):
if random.random() < split_strength * 0.5:
sentence = sentence.replace("", "").replace(",", "")
elif random.random() < split_strength:
sentence = sentence.replace("", " ").replace(",", " ")
sentences_done.append(sentence)
# 提取最终的句子内容
final_sentences = [content for content, sep in merged_segments if content] # 只保留有内容的段
logger.debug(f"处理后的句子: {sentences_done}")
return sentences_done
# 清理可能引入的空字符串
final_sentences = [s for s in final_sentences if s]
logger.debug(f"分割并合并后的句子: {final_sentences}")
return final_sentences
def random_remove_punctuation(text: str) -> str:
@@ -341,13 +363,11 @@ def process_llm_response(text: str) -> List[str]:
# 先保护颜文字
protected_text, kaomoji_mapping = protect_kaomoji(text)
logger.trace(f"保护颜文字后的文本: {protected_text}")
# 提取被 () 或 [] 包裹的内容
pattern = re.compile(r"[\(\[\].*?[\)\]\]")
# 提取被 () 或 [] 包裹且包含中文的内容
pattern = re.compile(r"[\(\[\](?=.*[\u4e00-\u9fff]).*?[\)\]\]")
# _extracted_contents = pattern.findall(text)
_extracted_contents = pattern.findall(protected_text) # 在保护后的文本上查找
extracted_contents = pattern.findall(protected_text) # 在保护后的文本上查找
# 去除 () 和 [] 及其包裹的内容
# cleaned_text = pattern.sub("", text)
cleaned_text = pattern.sub("", protected_text)
if cleaned_text == "":
@@ -358,12 +378,11 @@ def process_llm_response(text: str) -> List[str]:
# 对清理后的文本进行进一步处理
max_length = global_config.response_max_length * 2
max_sentence_num = global_config.response_max_sentence_num
if len(cleaned_text) > max_length and not is_western_paragraph(cleaned_text):
logger.warning(f"回复过长 ({len(cleaned_text)} 字符),返回默认回复")
return ["懒得说"]
elif len(cleaned_text) > 200:
logger.warning(f"回复过长 ({len(cleaned_text)} 字符),返回默认回复")
return ["懒得说"]
# 如果基本上是中文,则进行长度过滤
if get_western_ratio(cleaned_text) < 0.1:
if len(cleaned_text) > max_length:
logger.warning(f"回复过长 ({len(cleaned_text)} 字符),返回默认回复")
return ["懒得说"]
typo_generator = ChineseTypoGenerator(
error_rate=global_config.chinese_typo_error_rate,
@@ -390,10 +409,13 @@ def process_llm_response(text: str) -> List[str]:
if len(sentences) > max_sentence_num:
logger.warning(f"分割后消息数量过多 ({len(sentences)} 条),返回默认回复")
return [f"{global_config.BOT_NICKNAME}不知道哦"]
# sentences.extend(extracted_contents)
if extracted_contents:
for content in extracted_contents:
sentences.append(content)
# 在所有句子处理完毕后,对包含占位符的列表进行恢复
sentences = recover_kaomoji(sentences, kaomoji_mapping)
print(sentences)
return sentences
@@ -552,14 +574,24 @@ def recover_kaomoji(sentences, placeholder_to_kaomoji):
return recovered_sentences
def is_western_char(char):
"""检测是否为西文字符"""
return len(char.encode("utf-8")) <= 2
def is_western_paragraph(paragraph):
"""检测是否为西文字符段落"""
return all(is_western_char(char) for char in paragraph if char.isalnum())
def get_western_ratio(paragraph):
"""计算段落中字母数字字符的西文比例
原理:检查段落中字母数字字符的西文比例
通过is_english_letter函数判断每个字符是否为西文
只检查字母数字字符,忽略标点符号和空格等非字母数字字符
Args:
paragraph: 要检查的文本段落
Returns:
float: 西文字符比例(0.0-1.0)如果没有字母数字字符则返回0.0
"""
alnum_chars = [char for char in paragraph if char.isalnum()]
if not alnum_chars:
return 0.0
western_count = sum(1 for char in alnum_chars if is_english_letter(char))
return western_count / len(alnum_chars)
def count_messages_between(start_time: float, end_time: float, stream_id: str) -> tuple[int, int]:
@@ -673,19 +705,17 @@ def translate_timestamp_to_human_readable(timestamp: float, mode: str = "normal"
diff = now - timestamp
if diff < 20:
return "刚刚:"
return "刚刚:\n"
elif diff < 60:
return f"{int(diff)}秒前:"
elif diff < 1800:
return f"{int(diff / 60)}分钟前:"
return f"{int(diff)}秒前:\n"
elif diff < 3600:
return f"{int(diff / 60)}分钟前:\n"
elif diff < 86400:
return f"{int(diff / 3600)}小时前:\n"
elif diff < 604800:
elif diff < 86400 * 2:
return f"{int(diff / 86400)}天前:\n"
else:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) + ":"
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) + ":\n"
def parse_text_timestamps(text: str, mode: str = "normal") -> str: