feat:优化关键词提取,优化at和回复的解析

This commit is contained in:
SengokuCola
2025-07-25 16:51:13 +08:00
parent 4cc64a2ce1
commit 6900a8b269
5 changed files with 218 additions and 129 deletions

View File

@@ -12,6 +12,7 @@ from src.chat.message_receive.storage import MessageStorage
from src.chat.heart_flow.heartflow import heartflow
from src.chat.utils.utils import is_mentioned_bot_in_message
from src.chat.utils.timer_calculator import Timer
from src.chat.utils.chat_message_builder import replace_user_references_in_content
from src.common.logger import get_logger
from src.person_info.relationship_manager import get_relationship_manager
from src.mood.mood_manager import mood_manager
@@ -148,6 +149,14 @@ class HeartFCMessageReceiver:
# 如果消息中包含图片标识,则将 [picid:...] 替换为 [图片]
picid_pattern = r"\[picid:([^\]]+)\]"
processed_plain_text = re.sub(picid_pattern, "[图片]", message.processed_plain_text)
# 应用用户引用格式替换,将回复<aaa:bbb>和@<aaa:bbb>格式转换为可读格式
processed_plain_text = replace_user_references_in_content(
processed_plain_text,
message.message_info.platform,
is_async=False,
replace_bot_name=True
)
logger.info(f"[{mes_name}]{userinfo.user_nickname}:{processed_plain_text}[兴趣度:{interested_rate:.2f}]") # type: ignore

View File

@@ -305,7 +305,7 @@ class Hippocampus:
memories.sort(key=lambda x: x[2], reverse=True)
return memories
async def get_keywords_from_text(self, text: str, fast_retrieval: bool = False) -> list:
async def get_keywords_from_text(self, text: str) -> list:
"""从文本中提取关键词。
Args:
@@ -317,50 +317,45 @@ class Hippocampus:
if not text:
return []
if fast_retrieval:
# 使用jieba分词提取关键词
# 使用LLM提取关键词 - 根据详细文本长度分布优化topic_num计算
text_length = len(text)
topic_num:str|list[int] = None
if text_length <= 5:
words = jieba.cut(text)
# 过滤掉停用词和单字词
keywords = [word for word in words if len(word) > 1]
# 去重
keywords = list(set(keywords))
# 限制关键词数量
logger.debug(f"提取关键词: {keywords}")
keywords = list(set(keywords))[:3] # 限制最多3个关键词
logger.info(f"提取关键词: {keywords}")
return keywords
elif text_length <= 10:
topic_num = [1,3] # 6-10字符: 1个关键词 (27.18%的文本)
elif text_length <= 20:
topic_num = [2,4] # 11-20字符: 2个关键词 (22.76%的文本)
elif text_length <= 30:
topic_num = [3,5] # 21-30字符: 3个关键词 (10.33%的文本)
elif text_length <= 50:
topic_num = [4,5] # 31-50字符: 4个关键词 (9.79%的文本)
else:
# 使用LLM提取关键词 - 根据详细文本长度分布优化topic_num计算
text_length = len(text)
topic_num:str|list[int] = None
if text_length <= 5:
topic_num = [1,2] # 1-5字符: 1个关键词 (26.57%的文本)
elif text_length <= 10:
topic_num = 2 # 6-10字符: 1个关键词 (27.18%的文本)
elif text_length <= 20:
topic_num = [2,3] # 11-20字符: 2个关键词 (22.76%的文本)
elif text_length <= 30:
topic_num = 3 # 21-30字符: 3个关键词 (10.33%的文本)
elif text_length <= 50:
topic_num = 4 # 31-50字符: 4个关键词 (9.79%的文本)
else:
topic_num = 5 # 51+字符: 5个关键词 (其余长文本)
# logger.info(f"提取关键词数量: {topic_num}")
topics_response, (reasoning_content, model_name) = await self.model_summary.generate_response_async(
self.find_topic_llm(text, topic_num)
)
topic_num = 5 # 51+字符: 5个关键词 (其余长文本)
topics_response, (reasoning_content, model_name) = await self.model_summary.generate_response_async(
self.find_topic_llm(text, topic_num)
)
# 提取关键词
keywords = re.findall(r"<([^>]+)>", topics_response)
if not keywords:
keywords = []
else:
keywords = [
keyword.strip()
for keyword in ",".join(keywords).replace("", ",").replace("", ",").replace(" ", ",").split(",")
if keyword.strip()
]
return keywords
# 提取关键词
keywords = re.findall(r"<([^>]+)>", topics_response)
if not keywords:
keywords = []
else:
keywords = [
keyword.strip()
for keyword in ",".join(keywords).replace("", ",").replace("", ",").replace(" ", ",").split(",")
if keyword.strip()
]
logger.info(f"提取关键词: {keywords}")
return keywords
async def get_memory_from_text(
@@ -388,7 +383,7 @@ class Hippocampus:
- memory_items: list, 该主题下的记忆项列表
- similarity: float, 与文本的相似度
"""
keywords = await self.get_keywords_from_text(text, fast_retrieval)
keywords = await self.get_keywords_from_text(text)
# 过滤掉不存在于记忆图中的关键词
valid_keywords = [keyword for keyword in keywords if keyword in self.memory_graph.G]
@@ -710,7 +705,7 @@ class Hippocampus:
Returns:
float: 激活节点数与总节点数的比值
"""
keywords = await self.get_keywords_from_text(text, fast_retrieval)
keywords = await self.get_keywords_from_text(text)
# 过滤掉不存在于记忆图中的关键词
valid_keywords = [keyword for keyword in keywords if keyword in self.memory_graph.G]

View File

@@ -17,7 +17,7 @@ from src.chat.message_receive.uni_message_sender import HeartFCSender
from src.chat.utils.timer_calculator import Timer # <--- Import Timer
from src.chat.utils.utils import get_chat_type_and_target_info
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.chat.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
from src.chat.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat, replace_user_references_in_content
from src.chat.express.expression_selector import expression_selector
from src.chat.knowledge.knowledge_lib import qa_manager
from src.chat.memory_system.memory_activator import MemoryActivator
@@ -629,6 +629,14 @@ class DefaultReplyer:
mood_prompt = ""
sender, target = self._parse_reply_target(reply_to)
target = replace_user_references_in_content(
target,
chat_stream.platform,
is_async=False,
replace_bot_name=True
)
# 构建action描述 (如果启用planner)
action_descriptions = ""

View File

@@ -2,7 +2,7 @@ import time # 导入 time 模块以获取当前时间
import random
import re
from typing import List, Dict, Any, Tuple, Optional
from typing import List, Dict, Any, Tuple, Optional, Union, Callable
from rich.traceback import install
from src.config.config import global_config
@@ -15,6 +15,155 @@ from src.chat.utils.utils import translate_timestamp_to_human_readable,assign_me
install(extra_lines=3)
def replace_user_references_in_content(
content: str,
platform: str,
name_resolver: Union[Callable[[str, str], str], Callable[[str, str], Any]] = None,
is_async: bool = False,
replace_bot_name: bool = True
) -> Union[str, Any]:
"""
替换内容中的用户引用格式,包括回复<aaa:bbb>和@<aaa:bbb>格式
Args:
content: 要处理的内容字符串
platform: 平台标识
name_resolver: 名称解析函数,接收(platform, user_id)参数,返回用户名称
如果为None则使用默认的person_info_manager
is_async: 是否为异步模式
replace_bot_name: 是否将机器人的user_id替换为"机器人昵称(你)"
Returns:
处理后的内容字符串同步模式或awaitable对象异步模式
"""
if is_async:
return _replace_user_references_async(content, platform, name_resolver, replace_bot_name)
else:
return _replace_user_references_sync(content, platform, name_resolver, replace_bot_name)
def _replace_user_references_sync(
content: str,
platform: str,
name_resolver: Optional[Callable[[str, str], str]] = None,
replace_bot_name: bool = True
) -> str:
"""同步版本的用户引用替换"""
if name_resolver is None:
person_info_manager = get_person_info_manager()
def default_resolver(platform: str, user_id: str) -> str:
# 检查是否是机器人自己
if replace_bot_name and user_id == global_config.bot.qq_account:
return f"{global_config.bot.nickname}(你)"
person_id = PersonInfoManager.get_person_id(platform, user_id)
return person_info_manager.get_value_sync(person_id, "person_name") or user_id
name_resolver = default_resolver
# 处理回复<aaa:bbb>格式
reply_pattern = r"回复<([^:<>]+):([^:<>]+)>"
match = re.search(reply_pattern, content)
if match:
aaa = match.group(1)
bbb = match.group(2)
try:
# 检查是否是机器人自己
if replace_bot_name and bbb == global_config.bot.qq_account:
reply_person_name = f"{global_config.bot.nickname}(你)"
else:
reply_person_name = name_resolver(platform, bbb) or aaa
content = re.sub(reply_pattern, f"回复 {reply_person_name}", content, count=1)
except Exception:
# 如果解析失败,使用原始昵称
content = re.sub(reply_pattern, f"回复 {aaa}", content, count=1)
# 处理@<aaa:bbb>格式
at_pattern = r"@<([^:<>]+):([^:<>]+)>"
at_matches = list(re.finditer(at_pattern, content))
if at_matches:
new_content = ""
last_end = 0
for m in at_matches:
new_content += content[last_end:m.start()]
aaa = m.group(1)
bbb = m.group(2)
try:
# 检查是否是机器人自己
if replace_bot_name and bbb == global_config.bot.qq_account:
at_person_name = f"{global_config.bot.nickname}(你)"
else:
at_person_name = name_resolver(platform, bbb) or aaa
new_content += f"@{at_person_name}"
except Exception:
# 如果解析失败,使用原始昵称
new_content += f"@{aaa}"
last_end = m.end()
new_content += content[last_end:]
content = new_content
return content
async def _replace_user_references_async(
content: str,
platform: str,
name_resolver: Optional[Callable[[str, str], Any]] = None,
replace_bot_name: bool = True
) -> str:
"""异步版本的用户引用替换"""
if name_resolver is None:
person_info_manager = get_person_info_manager()
async def default_resolver(platform: str, user_id: str) -> str:
# 检查是否是机器人自己
if replace_bot_name and user_id == global_config.bot.qq_account:
return f"{global_config.bot.nickname}(你)"
person_id = PersonInfoManager.get_person_id(platform, user_id)
return await person_info_manager.get_value(person_id, "person_name") or user_id
name_resolver = default_resolver
# 处理回复<aaa:bbb>格式
reply_pattern = r"回复<([^:<>]+):([^:<>]+)>"
match = re.search(reply_pattern, content)
if match:
aaa = match.group(1)
bbb = match.group(2)
try:
# 检查是否是机器人自己
if replace_bot_name and bbb == global_config.bot.qq_account:
reply_person_name = f"{global_config.bot.nickname}(你)"
else:
reply_person_name = await name_resolver(platform, bbb) or aaa
content = re.sub(reply_pattern, f"回复 {reply_person_name}", content, count=1)
except Exception:
# 如果解析失败,使用原始昵称
content = re.sub(reply_pattern, f"回复 {aaa}", content, count=1)
# 处理@<aaa:bbb>格式
at_pattern = r"@<([^:<>]+):([^:<>]+)>"
at_matches = list(re.finditer(at_pattern, content))
if at_matches:
new_content = ""
last_end = 0
for m in at_matches:
new_content += content[last_end:m.start()]
aaa = m.group(1)
bbb = m.group(2)
try:
# 检查是否是机器人自己
if replace_bot_name and bbb == global_config.bot.qq_account:
at_person_name = f"{global_config.bot.nickname}(你)"
else:
at_person_name = await name_resolver(platform, bbb) or aaa
new_content += f"@{at_person_name}"
except Exception:
# 如果解析失败,使用原始昵称
new_content += f"@{aaa}"
last_end = m.end()
new_content += content[last_end:]
content = new_content
return content
def get_raw_msg_by_timestamp(
timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest"
) -> List[Dict[str, Any]]:
@@ -374,33 +523,8 @@ def _build_readable_messages_internal(
else:
person_name = "某人"
# 检查是否有 回复<aaa:bbb> 字段
reply_pattern = r"回复<([^:<>]+):([^:<>]+)>"
match = re.search(reply_pattern, content)
if match:
aaa: str = match[1]
bbb: str = match[2]
reply_person_id = PersonInfoManager.get_person_id(platform, bbb)
reply_person_name = person_info_manager.get_value_sync(reply_person_id, "person_name") or aaa
# 在内容前加上回复信息
content = re.sub(reply_pattern, lambda m, name=reply_person_name: f"回复 {name}", content, count=1)
# 检查是否有 @<aaa:bbb> 字段 @<{member_info.get('nickname')}:{member_info.get('user_id')}>
at_pattern = r"@<([^:<>]+):([^:<>]+)>"
at_matches = list(re.finditer(at_pattern, content))
if at_matches:
new_content = ""
last_end = 0
for m in at_matches:
new_content += content[last_end : m.start()]
aaa = m.group(1)
bbb = m.group(2)
at_person_id = PersonInfoManager.get_person_id(platform, bbb)
at_person_name = person_info_manager.get_value_sync(at_person_id, "person_name") or aaa
new_content += f"@{at_person_name}"
last_end = m.end()
new_content += content[last_end:]
content = new_content
# 使用独立函数处理用户引用格式
content = replace_user_references_in_content(content, platform, is_async=False, replace_bot_name=replace_bot_name)
target_str = "这是QQ的一个功能用于提及某人但没那么明显"
if target_str in content and random.random() < 0.6:
@@ -916,38 +1040,14 @@ async def build_anonymous_messages(messages: List[Dict[str, Any]]) -> str:
anon_name = get_anon_name(platform, user_id)
# print(f"anon_name:{anon_name}")
# 处理 回复<aaa:bbb>
reply_pattern = r"回复<([^:<>]+):([^:<>]+)>"
match = re.search(reply_pattern, content)
if match:
# print(f"发现回复match:{match}")
bbb = match.group(2)
# 使用独立函数处理用户引用格式,传入自定义的匿名名称解析器
def anon_name_resolver(platform: str, user_id: str) -> str:
try:
anon_reply = get_anon_name(platform, bbb)
# print(f"anon_reply:{anon_reply}")
return get_anon_name(platform, user_id)
except Exception:
anon_reply = "?"
content = re.sub(reply_pattern, f"回复 {anon_reply}", content, count=1)
# 处理 @<aaa:bbb>无嵌套def
at_pattern = r"@<([^:<>]+):([^:<>]+)>"
at_matches = list(re.finditer(at_pattern, content))
if at_matches:
# print(f"发现@match:{at_matches}")
new_content = ""
last_end = 0
for m in at_matches:
new_content += content[last_end : m.start()]
bbb = m.group(2)
try:
anon_at = get_anon_name(platform, bbb)
# print(f"anon_at:{anon_at}")
except Exception:
anon_at = "?"
new_content += f"@{anon_at}"
last_end = m.end()
new_content += content[last_end:]
content = new_content
return "?"
content = replace_user_references_in_content(content, platform, anon_name_resolver, is_async=False, replace_bot_name=False)
header = f"{anon_name}"
output_lines.append(header)

View File

@@ -19,6 +19,7 @@
await send_api.custom_message("video", video_data, "123456", True)
"""
import asyncio
import traceback
import time
import difflib
@@ -30,7 +31,7 @@ from src.common.logger import get_logger
from src.chat.message_receive.chat_stream import get_chat_manager
from src.chat.message_receive.uni_message_sender import HeartFCSender
from src.chat.message_receive.message import MessageSending, MessageRecv
from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat
from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat, replace_user_references_in_content
from src.person_info.person_info import get_person_info_manager
from maim_message import Seg, UserInfo
from src.config.config import global_config
@@ -183,32 +184,8 @@ async def _find_reply_message(target_stream, reply_to: str) -> Optional[MessageR
if person_name == sender:
translate_text = message["processed_plain_text"]
# 检查是否有 回复<aaa:bbb> 字段
reply_pattern = r"回复<([^:<>]+):([^:<>]+)>"
if match := re.search(reply_pattern, translate_text):
aaa = match.group(1)
bbb = match.group(2)
reply_person_id = get_person_info_manager().get_person_id(platform, bbb)
reply_person_name = await get_person_info_manager().get_value(reply_person_id, "person_name") or aaa
# 在内容前加上回复信息
translate_text = re.sub(reply_pattern, f"回复 {reply_person_name}", translate_text, count=1)
# 检查是否有 @<aaa:bbb> 字段
at_pattern = r"@<([^:<>]+):([^:<>]+)>"
at_matches = list(re.finditer(at_pattern, translate_text))
if at_matches:
new_content = ""
last_end = 0
for m in at_matches:
new_content += translate_text[last_end : m.start()]
aaa = m.group(1)
bbb = m.group(2)
at_person_id = get_person_info_manager().get_person_id(platform, bbb)
at_person_name = await get_person_info_manager().get_value(at_person_id, "person_name") or aaa
new_content += f"@{at_person_name}"
last_end = m.end()
new_content += translate_text[last_end:]
translate_text = new_content
# 使用独立函数处理用户引用格式
translate_text = await replace_user_references_in_content(translate_text, platform, is_async=True)
similarity = difflib.SequenceMatcher(None, text, translate_text).ratio()
if similarity >= 0.9: