better:优化了图片展示形式

This commit is contained in:
SengokuCola
2025-06-14 15:03:09 +08:00
parent bf0e813142
commit e15a9b29ff
8 changed files with 301 additions and 76 deletions

View File

@@ -8,6 +8,7 @@ from src.person_info.person_info import PersonInfoManager, get_person_info_manag
from src.chat.utils.utils import translate_timestamp_to_human_readable
from rich.traceback import install
from src.common.database.database_model import ActionRecords
from src.common.database.database_model import Images
install(extra_lines=3)
@@ -157,7 +158,9 @@ def _build_readable_messages_internal(
merge_messages: bool = False,
timestamp_mode: str = "relative",
truncate: bool = False,
) -> Tuple[str, List[Tuple[float, str, str]]]:
pic_id_mapping: Dict[str, str] = None,
pic_counter: int = 1,
) -> Tuple[str, List[Tuple[float, str, str]], Dict[str, str], int]:
"""
内部辅助函数,构建可读消息字符串和原始消息详情列表。
@@ -167,14 +170,40 @@ def _build_readable_messages_internal(
merge_messages: 是否合并来自同一用户的连续消息。
timestamp_mode: 时间戳的显示模式 ('relative', 'absolute', etc.)。传递给 translate_timestamp_to_human_readable。
truncate: 是否根据消息的新旧程度截断过长的消息内容。
pic_id_mapping: 图片ID映射字典如果为None则创建新的
pic_counter: 图片计数器起始值
Returns:
包含格式化消息的字符串原始消息详情列表 (时间戳, 发送者名称, 内容) 的元组。
包含格式化消息的字符串原始消息详情列表、图片映射字典和更新后的计数器的元组。
"""
if not messages:
return "", []
return "", [], pic_id_mapping or {}, pic_counter
message_details_raw: List[Tuple[float, str, str]] = []
# 使用传入的映射字典,如果没有则创建新的
if pic_id_mapping is None:
pic_id_mapping = {}
current_pic_counter = pic_counter
def process_pic_ids(content: str) -> str:
"""处理内容中的图片ID将其替换为[图片x]格式"""
nonlocal current_pic_counter
# 匹配 [picid:xxxxx] 格式
pic_pattern = r'\[picid:([^\]]+)\]'
def replace_pic_id(match):
nonlocal current_pic_counter
pic_id = match.group(1)
if pic_id not in pic_id_mapping:
pic_id_mapping[pic_id] = f"图片{current_pic_counter}"
current_pic_counter += 1
return f"[{pic_id_mapping[pic_id]}]"
return re.sub(pic_pattern, replace_pic_id, content)
# 1 & 2: 获取发送者信息并提取消息组件
for msg in messages:
@@ -183,7 +212,8 @@ def _build_readable_messages_internal(
is_action = True
timestamp = msg.get("time")
content = msg.get("display_message", "")
# 对于动作记录,直接使用内容
# 对于动作记录,也处理图片ID
content = process_pic_ids(content)
message_details_raw.append((timestamp, global_config.bot.nickname, content, is_action))
continue
@@ -215,6 +245,9 @@ def _build_readable_messages_internal(
if "" in content:
content = content.replace("", "")
# 处理图片ID
content = process_pic_ids(content)
# 检查必要信息是否存在
if not all([platform, user_id, timestamp is not None]):
continue
@@ -277,7 +310,7 @@ def _build_readable_messages_internal(
message_details_raw.append((timestamp, person_name, content, False))
if not message_details_raw:
return "", []
return "", [], pic_id_mapping, current_pic_counter
message_details_raw.sort(key=lambda x: x[0]) # 按时间戳(第一个元素)升序排序,越早的消息排在前面
@@ -285,10 +318,6 @@ def _build_readable_messages_internal(
message_details_with_flags = []
for timestamp, name, content, is_action in message_details_raw:
message_details_with_flags.append((timestamp, name, content, is_action))
# print(f"content:{content}")
# print(f"is_action:{is_action}")
# print(f"message_details_with_flags:{message_details_with_flags}")
# 应用截断逻辑 (如果 truncate 为 True)
message_details: List[Tuple[float, str, str, bool]] = []
@@ -326,8 +355,6 @@ def _build_readable_messages_internal(
# 如果不截断,直接使用原始列表
message_details = message_details_with_flags
# print(f"message_details:{message_details}")
# 3: 合并连续消息 (如果 merge_messages 为 True)
merged_messages = []
if merge_messages and message_details:
@@ -388,6 +415,7 @@ def _build_readable_messages_internal(
# 4 & 5: 格式化为字符串
output_lines = []
for _i, merged in enumerate(merged_messages):
# 使用指定的 timestamp_mode 格式化时间
readable_time = translate_timestamp_to_human_readable(merged["start_time"], mode=timestamp_mode)
@@ -416,9 +444,44 @@ def _build_readable_messages_internal(
# 移除可能的多余换行,然后合并
formatted_string = "".join(output_lines).strip()
# 返回格式化后的字符串和 *应用截断后* 的 message_details 列表
# 注意:如果外部调用者需要原始未截断的内容,可能需要调整返回策略
return formatted_string, [(t, n, c) for t, n, c, is_action in message_details if not is_action]
# 返回格式化后的字符串、消息详情列表、图片映射字典和更新后的计数器
return formatted_string, [(t, n, c) for t, n, c, is_action in message_details if not is_action], pic_id_mapping, current_pic_counter
def build_pic_mapping_info(pic_id_mapping: Dict[str, str]) -> str:
"""
构建图片映射信息字符串,显示图片的具体描述内容
Args:
pic_id_mapping: 图片ID到显示名称的映射字典
Returns:
格式化的映射信息字符串
"""
if not pic_id_mapping:
return ""
mapping_lines = []
# 按图片编号排序
sorted_items = sorted(pic_id_mapping.items(), key=lambda x: int(x[1].replace("图片", "")))
for pic_id, display_name in sorted_items:
# 从数据库中获取图片描述
description = "内容正在阅读"
try:
image = Images.get_or_none(Images.image_id == pic_id)
if image and image.description:
description = image.description
except Exception as e:
# 如果查询失败,保持默认描述
pass
mapping_lines.append(f"[{display_name}] 的内容:{description}")
return "\n".join(mapping_lines)
async def build_readable_messages_with_list(
@@ -432,9 +495,15 @@ async def build_readable_messages_with_list(
将消息列表转换为可读的文本格式,并返回原始(时间戳, 昵称, 内容)列表。
允许通过参数控制格式化行为。
"""
formatted_string, details_list = _build_readable_messages_internal(
formatted_string, details_list, pic_id_mapping, _ = _build_readable_messages_internal(
messages, replace_bot_name, merge_messages, timestamp_mode, truncate
)
# 生成图片映射信息并添加到最前面
pic_mapping_info = build_pic_mapping_info(pic_id_mapping)
if pic_mapping_info:
formatted_string = f"{pic_mapping_info}\n\n{formatted_string}"
return formatted_string, details_list
@@ -503,53 +572,56 @@ def build_readable_messages(
if read_mark <= 0:
# 没有有效的 read_mark直接格式化所有消息
# for message in messages:
# print(f"message:{message}")
formatted_string, _ = _build_readable_messages_internal(
formatted_string, _, pic_id_mapping, _ = _build_readable_messages_internal(
copy_messages, replace_bot_name, merge_messages, timestamp_mode, truncate
)
# print(f"formatted_string:{formatted_string}")
return formatted_string
# 生成图片映射信息并添加到最前面
pic_mapping_info = build_pic_mapping_info(pic_id_mapping)
if pic_mapping_info:
return f"{pic_mapping_info}\n\n{formatted_string}"
else:
return formatted_string
else:
# 按 read_mark 分割消息
messages_before_mark = [msg for msg in copy_messages if msg.get("time", 0) <= read_mark]
messages_after_mark = [msg for msg in copy_messages if msg.get("time", 0) > read_mark]
# for message in messages_before_mark:
# print(f"message:{message}")
# 共享的图片映射字典和计数器
pic_id_mapping = {}
pic_counter = 1
# for message in messages_after_mark:
# print(f"message:{message}")
# 分别格式化
formatted_before, _ = _build_readable_messages_internal(
messages_before_mark, replace_bot_name, merge_messages, timestamp_mode, truncate
# 分别格式化,但使用共享的图片映射
formatted_before, _, pic_id_mapping, pic_counter = _build_readable_messages_internal(
messages_before_mark, replace_bot_name, merge_messages, timestamp_mode, truncate,
pic_id_mapping, pic_counter
)
formatted_after, _ = _build_readable_messages_internal(
messages_after_mark,
replace_bot_name,
merge_messages,
timestamp_mode,
formatted_after, _, pic_id_mapping, _ = _build_readable_messages_internal(
messages_after_mark, replace_bot_name, merge_messages, timestamp_mode, False,
pic_id_mapping, pic_counter
)
# print(f"formatted_before:{formatted_before}")
# print(f"formatted_after:{formatted_after}")
read_mark_line = "\n--- 以上消息是你已经看过---\n--- 请关注以下未读的新消息---\n"
# 生成图片映射信息
pic_mapping_info = f"图片信息:\n{build_pic_mapping_info(pic_id_mapping)}\n聊天记录信息:\n"
# 组合结果
result_parts = []
if pic_mapping_info:
result_parts.append(pic_mapping_info)
result_parts.append("\n")
if formatted_before and formatted_after:
return f"{formatted_before}{read_mark_line}{formatted_after}"
result_parts.extend([formatted_before, read_mark_line, formatted_after])
elif formatted_before:
return f"{formatted_before}{read_mark_line}"
result_parts.extend([formatted_before, read_mark_line])
elif formatted_after:
return f"{read_mark_line}{formatted_after}"
result_parts.extend([read_mark_line, formatted_after])
else:
return read_mark_line.strip()
result_parts.append(read_mark_line.strip())
return "".join(result_parts)
async def build_anonymous_messages(messages: List[Dict[str, Any]]) -> str:
@@ -564,6 +636,29 @@ async def build_anonymous_messages(messages: List[Dict[str, Any]]) -> str:
person_map = {}
current_char = ord("A")
output_lines = []
# 图片ID映射字典
pic_id_mapping = {}
pic_counter = 1
def process_pic_ids(content: str) -> str:
"""处理内容中的图片ID将其替换为[图片x]格式"""
nonlocal pic_counter
# 匹配 [picid:xxxxx] 格式
pic_pattern = r'\[picid:([^\]]+)\]'
def replace_pic_id(match):
nonlocal pic_counter
pic_id = match.group(1)
if pic_id not in pic_id_mapping:
pic_id_mapping[pic_id] = f"图片{pic_counter}"
pic_counter += 1
return f"[{pic_id_mapping[pic_id]}]"
return re.sub(pic_pattern, replace_pic_id, content)
def get_anon_name(platform, user_id):
# print(f"get_anon_name: platform:{platform}, user_id:{user_id}")
@@ -599,6 +694,9 @@ async def build_anonymous_messages(messages: List[Dict[str, Any]]) -> str:
if "" in content:
content = content.replace("", "")
# 处理图片ID
content = process_pic_ids(content)
# if not all([platform, user_id, timestamp is not None]):
# continue
@@ -650,7 +748,15 @@ async def build_anonymous_messages(messages: List[Dict[str, Any]]) -> str:
except Exception:
continue
formatted_string = "".join(output_lines).strip()
# 在最前面添加图片映射信息
final_output_lines = []
pic_mapping_info = build_pic_mapping_info(pic_id_mapping)
if pic_mapping_info:
final_output_lines.append(pic_mapping_info)
final_output_lines.append("\n\n")
final_output_lines.extend(output_lines)
formatted_string = "".join(final_output_lines).strip()
return formatted_string