Merge branch 'dev' into fix-kaomoji-missing-bug

This commit is contained in:
SengokuCola
2025-04-16 19:32:32 +08:00
committed by GitHub
45 changed files with 596 additions and 271 deletions

View File

@@ -8,7 +8,7 @@ import jieba
import numpy as np
from src.common.logger import get_module_logger
from ..models.utils_model import LLM_request
from ..models.utils_model import LLMRequest
from ..utils.typo_generator import ChineseTypoGenerator
from ..config.config import global_config
from .message import MessageRecv, Message
@@ -38,21 +38,35 @@ def db_message_to_str(message_dict: Dict) -> str:
return result
def is_mentioned_bot_in_message(message: MessageRecv) -> bool:
def is_mentioned_bot_in_message(message: MessageRecv) -> tuple[bool, float]:
"""检查消息是否提到了机器人"""
keywords = [global_config.BOT_NICKNAME]
nicknames = global_config.BOT_ALIAS_NAMES
reply_probability = 0
reply_probability = 0.0
is_at = False
is_mentioned = False
if (
message.message_info.additional_config is not None
and message.message_info.additional_config.get("is_mentioned") is not None
):
try:
reply_probability = float(message.message_info.additional_config.get("is_mentioned"))
is_mentioned = True
return is_mentioned, reply_probability
except Exception as e:
logger.warning(e)
logger.warning(
f"消息中包含不合理的设置 is_mentioned: {message.message_info.additional_config.get('is_mentioned')}"
)
# 判断是否被@
if re.search(f"@[\s\S]*?id:{global_config.BOT_QQ}", message.processed_plain_text):
is_at = True
is_mentioned = True
if is_at and global_config.at_bot_inevitable_reply:
reply_probability = 1
reply_probability = 1.0
logger.info("被@回复概率设置为100%")
else:
if not is_mentioned:
@@ -61,7 +75,7 @@ def is_mentioned_bot_in_message(message: MessageRecv) -> bool:
is_mentioned = True
# 判断内容中是否被提及
message_content = re.sub(r"\@[\s\S]*?(\d+)", "", message.processed_plain_text)
message_content = re.sub(r"@[\s\S]*?(\d+)", "", message.processed_plain_text)
message_content = re.sub(r"回复[\s\S]*?\((\d+)\)的消息,说: ", "", message_content)
for keyword in keywords:
if keyword in message_content:
@@ -70,14 +84,14 @@ def is_mentioned_bot_in_message(message: MessageRecv) -> bool:
if nickname in message_content:
is_mentioned = True
if is_mentioned and global_config.mentioned_bot_inevitable_reply:
reply_probability = 1
reply_probability = 1.0
logger.info("被提及回复概率设置为100%")
return is_mentioned, reply_probability
async def get_embedding(text, request_type="embedding"):
"""获取文本的embedding向量"""
llm = LLM_request(model=global_config.embedding, request_type=request_type)
llm = LLMRequest(model=global_config.embedding, request_type=request_type)
# return llm.get_embedding_sync(text)
try:
embedding = await llm.get_embedding(text)
@@ -91,7 +105,7 @@ async def get_recent_group_messages(chat_id: str, limit: int = 12) -> list:
"""从数据库获取群组最近的消息记录
Args:
group_id: 群组ID
chat_id: 群组ID
limit: 获取消息数量默认12条
Returns:
@@ -331,6 +345,7 @@ def process_llm_response(text: str) -> List[str]:
pattern = re.compile(r"[\(\[].*?[\)\]]")
# _extracted_contents = pattern.findall(text)
_extracted_contents = pattern.findall(protected_text) # 在保护后的文本上查找
# 去除 () 和 [] 及其包裹的内容
# cleaned_text = pattern.sub("", text)
cleaned_text = pattern.sub("", protected_text)
@@ -493,16 +508,16 @@ def protect_kaomoji(sentence):
"""
kaomoji_pattern = re.compile(
r"("
r"[\(\[(【]" # 左括号
r"[(\[(【]" # 左括号
r"[^()\[\]()【】]*?" # 非括号字符(惰性匹配)
r"[^\u4e00-\u9fa5a-zA-Z0-9\s]" # 非中文、非英文、非数字、非空格字符(必须包含至少一个)
r"[^一-龥a-zA-Z0-9\s]" # 非中文、非英文、非数字、非空格字符(必须包含至少一个)
r"[^()\[\]()【】]*?" # 非括号字符(惰性匹配)
r"[\)\])】]" # 右括号
r"[\)\])】" # 右括号
r"]"
r")"
r"|"
r"("
r"[▼▽・ᴥω・﹏^><≧≦ ̄`´∀ヮДд︿﹀へ。゚╥╯╰︶︹•⁄]{2,15}"
r")"
r"([▼▽・ᴥω・﹏^><≧≦ ̄`´∀ヮДд︿﹀へ。゚╥╯╰︶︹•⁄]{2,15"
r"}"
)
kaomoji_matches = kaomoji_pattern.findall(sentence)
@@ -636,3 +651,142 @@ def count_messages_between(start_time: float, end_time: float, stream_id: str) -
except Exception as e:
logger.error(f"计算消息数量时出错: {str(e)}")
return 0, 0
def translate_timestamp_to_human_readable(timestamp: float, mode: str = "normal") -> str:
"""将时间戳转换为人类可读的时间格式
Args:
timestamp: 时间戳
mode: 转换模式,"normal"为标准格式,"relative"为相对时间格式
Returns:
str: 格式化后的时间字符串
"""
if mode == "normal":
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp))
elif mode == "relative":
now = time.time()
diff = now - timestamp
if diff < 20:
return "刚刚:"
elif diff < 60:
return f"{int(diff)}秒前:"
elif diff < 1800:
return f"{int(diff / 60)}分钟前:"
elif diff < 3600:
return f"{int(diff / 60)}分钟前:\n"
elif diff < 86400:
return f"{int(diff / 3600)}小时前:\n"
elif diff < 604800:
return f"{int(diff / 86400)}天前:\n"
else:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) + ":"
def parse_text_timestamps(text: str, mode: str = "normal") -> str:
"""解析文本中的时间戳并转换为可读时间格式
Args:
text: 包含时间戳的文本,时间戳应以[]包裹
mode: 转换模式传递给translate_timestamp_to_human_readable"normal""relative"
Returns:
str: 替换后的文本
转换规则:
- normal模式: 将文本中所有时间戳转换为可读格式
- lite模式:
- 第一个和最后一个时间戳必须转换
- 以5秒为间隔划分时间段每段最多转换一个时间戳
- 不转换的时间戳替换为空字符串
"""
# 匹配[数字]或[数字.数字]格式的时间戳
pattern = r"\[(\d+(?:\.\d+)?)\]"
# 找出所有匹配的时间戳
matches = list(re.finditer(pattern, text))
if not matches:
return text
# normal模式: 直接转换所有时间戳
if mode == "normal":
result_text = text
for match in matches:
timestamp = float(match.group(1))
readable_time = translate_timestamp_to_human_readable(timestamp, "normal")
# 由于替换会改变文本长度,需要使用正则替换而非直接替换
pattern_instance = re.escape(match.group(0))
result_text = re.sub(pattern_instance, readable_time, result_text, count=1)
return result_text
else:
# lite模式: 按5秒间隔划分并选择性转换
result_text = text
# 提取所有时间戳及其位置
timestamps = [(float(m.group(1)), m) for m in matches]
timestamps.sort(key=lambda x: x[0]) # 按时间戳升序排序
if not timestamps:
return text
# 获取第一个和最后一个时间戳
first_timestamp, first_match = timestamps[0]
last_timestamp, last_match = timestamps[-1]
# 将时间范围划分成5秒间隔的时间段
time_segments = {}
# 对所有时间戳按15秒间隔分组
for ts, match in timestamps:
segment_key = int(ts // 15) # 将时间戳除以15取整作为时间段的键
if segment_key not in time_segments:
time_segments[segment_key] = []
time_segments[segment_key].append((ts, match))
# 记录需要转换的时间戳
to_convert = []
# 从每个时间段中选择一个时间戳进行转换
for _, segment_timestamps in time_segments.items():
# 选择这个时间段中的第一个时间戳
to_convert.append(segment_timestamps[0])
# 确保第一个和最后一个时间戳在转换列表中
first_in_list = False
last_in_list = False
for ts, _ in to_convert:
if ts == first_timestamp:
first_in_list = True
if ts == last_timestamp:
last_in_list = True
if not first_in_list:
to_convert.append((first_timestamp, first_match))
if not last_in_list:
to_convert.append((last_timestamp, last_match))
# 创建需要转换的时间戳集合,用于快速查找
to_convert_set = {match.group(0) for _, match in to_convert}
# 首先替换所有不需要转换的时间戳为空字符串
for _, match in timestamps:
if match.group(0) not in to_convert_set:
pattern_instance = re.escape(match.group(0))
result_text = re.sub(pattern_instance, "", result_text, count=1)
# 按照时间戳原始顺序排序,避免替换时位置错误
to_convert.sort(key=lambda x: x[1].start())
# 执行替换
# 由于替换会改变文本长度,从后向前替换
to_convert.reverse()
for ts, match in to_convert:
readable_time = translate_timestamp_to_human_readable(ts, "relative")
pattern_instance = re.escape(match.group(0))
result_text = re.sub(pattern_instance, readable_time, result_text, count=1)
return result_text