feat(reply): 引入统一格式过滤器,优化回复内容清理逻辑

This commit is contained in:
Windpicker-owo
2025-11-10 14:12:11 +08:00
parent 87f4b10959
commit 5d4ff6507a
3 changed files with 74 additions and 129 deletions

View File

@@ -1841,8 +1841,9 @@ class DefaultReplyer:
# 移除 [SPLIT] 标记,防止消息被分割 # 移除 [SPLIT] 标记,防止消息被分割
content = content.replace("[SPLIT]", "") content = content.replace("[SPLIT]", "")
# 应用增强的格式过滤器 # 应用统一的格式过滤器
content = self._apply_system_format_filter(content) from src.chat.utils.utils import filter_system_format_content
content = filter_system_format_content(content)
logger.debug(f"replyer生成内容: {content}") logger.debug(f"replyer生成内容: {content}")
return content, reasoning_content, model_name, tool_calls return content, reasoning_content, model_name, tool_calls
@@ -2129,71 +2130,6 @@ class DefaultReplyer:
except Exception as e: except Exception as e:
logger.error(f"存储聊天记忆失败: {e}") logger.error(f"存储聊天记忆失败: {e}")
def _apply_system_format_filter(self, content: str) -> str:
"""
应用增强的系统格式过滤器,移除各种系统格式化文本
此方法过滤以下类型的系统格式化内容:
1. 回复格式:[回复xxx]xxx
2. 表情包格式:[表情包xxx]
3. 图片格式:[图片:xxx]
4. @格式:@<xxx>
5. 原有的[回复开头格式
Args:
content: 原始内容
Returns:
过滤后的内容
"""
import re
if not content:
return content
original_content = content
cleaned_content = content.strip()
# 1. 移除回复格式:[回复xxx]xxx各种变体
# 匹配所有包含"],说:"格式的回复
cleaned_content = re.sub(r"\[回复[^\]]*\],说:\s*", "", cleaned_content)
# 匹配 [回复<xxx:数字>]xxx 格式
cleaned_content = re.sub(r"\[回复<[^>]*>\],说:\s*", "", cleaned_content)
# 2. 处理原有的[回复开头格式(保持向后兼容)
# 注意:这步要在上面处理完成后再执行,避免冲突
if cleaned_content.startswith("[回复"):
last_bracket_index = cleaned_content.rfind("]")
if last_bracket_index != -1:
cleaned_content = cleaned_content[last_bracket_index + 1 :].strip()
# 3. 移除表情包格式:[表情包xxx]
cleaned_content = re.sub(r"\[表情包:[^\]]*\]", "", cleaned_content)
# 4. 移除图片格式:[图片:xxx]
cleaned_content = re.sub(r"\[图片:[^\]]*\]", "", cleaned_content)
# 5. 移除@格式:@<xxx>
cleaned_content = re.sub(r"@<[^>]*>", "", cleaned_content)
# 6. 移除其他可能的系统格式
# [表情包(描述生成失败)] 等错误格式
cleaned_content = re.sub(r"\[表情包\([^)]*\)\]", "", cleaned_content)
# [图片(描述生成失败)] 等错误格式
cleaned_content = re.sub(r"\[图片\([^)]*\)\]", "", cleaned_content)
# 清理多余空格
cleaned_content = re.sub(r"\s+", " ", cleaned_content).strip()
# 记录过滤操作
if cleaned_content != original_content.strip():
logger.info(
f"[格式过滤器] 检测到并清理了系统格式化文本。"
f"原始内容: '{original_content}', "
f"清理后: '{cleaned_content}'"
)
return cleaned_content
def weighted_sample_no_replacement(items, weights, k) -> list: def weighted_sample_no_replacement(items, weights, k) -> list:

View File

@@ -928,67 +928,68 @@ def assign_message_ids_flexible(
# result3 = assign_message_ids_flexible(messages, prefix="ts", use_timestamp=True) # result3 = assign_message_ids_flexible(messages, prefix="ts", use_timestamp=True)
# # 结果: [{'id': 'ts123a1b', 'message': 'Hello'}, {'id': 'ts123c2d', 'message': 'World'}, {'id': 'ts123e3f', 'message': 'Test message'}] # # 结果: [{'id': 'ts123a1b', 'message': 'Hello'}, {'id': 'ts123c2d', 'message': 'World'}, {'id': 'ts123e3f', 'message': 'Test message'}]
def parse_keywords_string(keywords_input) -> list[str]:
"""
统一的关键词解析函数,支持多种格式的关键词字符串解析
支持的格式: def filter_system_format_content(content: str | None) -> str:
1. 字符串列表格式:'["utils.py", "修改", "代码", "动作"]' """
2. 斜杠分隔格式:'utils.py/修改/代码/动作' 过滤系统格式化内容,移除回复、@、图片、表情包等系统生成的格式文本
3. 逗号分隔格式:'utils.py,修改,代码,动作'
4. 空格分隔格式:'utils.py 修改 代码 动作' 此方法过滤以下类型的系统格式化内容:
5. 已经是列表的情况:["utils.py", "修改", "代码", "动作"] 1. 回复格式:[回复xxx]xxx
6. JSON格式字符串'{"keywords": ["utils.py", "修改", "代码", "动作"]}' 2. 表情包格式:[表情包xxx]
3. 图片格式:[图片:xxx]
4. @格式:@<xxx>
5. 错误格式:[表情包(...)]、[图片(...)]
6. [回复开头的格式
Args: Args:
keywords_input: 关键词输入,可以是字符串或列表 content: 原始内容
Returns: Returns:
list[str]: 解析后的关键词列表,去除空白项 过滤后的纯文本内容
""" """
if not keywords_input: if not content:
return [] return ""
# 如果已经是列表,直接处理 original_content = content
if isinstance(keywords_input, list): cleaned_content = content.strip()
return [str(k).strip() for k in keywords_input if str(k).strip()]
# 转换为字符串处理 # 1. 移除回复格式:[回复xxx]xxx各种变体
keywords_str = str(keywords_input).strip() # 匹配所有包含"],说:"格式的回复
if not keywords_str: cleaned_content = re.sub(r"\[回复[^\]]*\],说:\s*", "", cleaned_content)
return [] # 匹配 [回复<xxx:数字>]xxx 格式
cleaned_content = re.sub(r"\[回复<[^>]*>\],说:\s*", "", cleaned_content)
try: # 2. 处理原有的[回复开头格式(保持向后兼容)
# 尝试作为JSON对象解析支持 {"keywords": [...]} 格式) # 注意:这步要在上面处理完成后再执行,避免冲突
import json if cleaned_content.startswith("[回复"):
json_data = json.loads(keywords_str) last_bracket_index = cleaned_content.rfind("]")
if isinstance(json_data, dict) and "keywords" in json_data: if last_bracket_index != -1:
keywords_list = json_data["keywords"] cleaned_content = cleaned_content[last_bracket_index + 1 :].strip()
if isinstance(keywords_list, list):
return [str(k).strip() for k in keywords_list if str(k).strip()]
elif isinstance(json_data, list):
# 直接是JSON数组格式
return [str(k).strip() for k in json_data if str(k).strip()]
except (json.JSONDecodeError, ValueError):
pass
try: # 3. 移除表情包格式:[表情包xxx]
# 尝试使用 ast.literal_eval 解析支持Python字面量格式 cleaned_content = re.sub(r"\[表情包:[^\]]*\]", "", cleaned_content)
import ast
parsed = ast.literal_eval(keywords_str)
if isinstance(parsed, list):
return [str(k).strip() for k in parsed if str(k).strip()]
except (ValueError, SyntaxError):
pass
# 尝试不同的分隔符 # 4. 移除图片格式:[图片:xxx]
separators = ['/', ',', ' ', '|', ';'] cleaned_content = re.sub(r"\[图片:[^\]]*\]", "", cleaned_content)
for separator in separators: # 5. 移除@格式:@<xxx>
if separator in keywords_str: cleaned_content = re.sub(r"@<[^>]*>", "", cleaned_content)
keywords_list = [k.strip() for k in keywords_str.split(separator) if k.strip()]
if len(keywords_list) > 1: # 确保分割有效
return keywords_list
# 如果没有分隔符,返回单个关键词 # 6. 移除其他可能的系统格式
return [keywords_str] if keywords_str else [] # [表情包(描述生成失败)] 等错误格式
cleaned_content = re.sub(r"\[表情包\([^)]*\)\]", "", cleaned_content)
# [图片(描述生成失败)] 等错误格式
cleaned_content = re.sub(r"\[图片\([^)]*\)\]", "", cleaned_content)
# 清理多余空格
cleaned_content = re.sub(r"\s+", " ", cleaned_content).strip()
# 记录过滤操作
if cleaned_content != original_content.strip():
logger.info(
f"[系统格式过滤器] 检测到并清理了系统格式化文本。"
f"原始内容: '{original_content}', "
f"清理后: '{cleaned_content}'"
)
return cleaned_content

View File

@@ -482,7 +482,15 @@ class ProactiveThinkingPlanner:
return None return None
logger.info(f"生成回复成功: {response[:50]}...") logger.info(f"生成回复成功: {response[:50]}...")
return response.strip()
# 应用格式过滤器,确保回复内容不包含系统格式化文本
from src.chat.utils.utils import filter_system_format_content
filtered_response = filter_system_format_content(response.strip())
if filtered_response != response.strip():
logger.debug(f"主动思考回复已过滤系统格式: '{response.strip()}' -> '{filtered_response}'")
return filtered_response
except Exception as e: except Exception as e:
logger.error(f"生成回复失败: {e}", exc_info=True) logger.error(f"生成回复失败: {e}", exc_info=True)