feat(affinity-flow): 重构消息处理以使用StreamContext对象

重构AFC消息处理系统,将基于字典的消息数据传递改为直接使用StreamContext对象。主要变更包括:

- 修改AFCManager的process_message方法为process_stream_context,直接接收StreamContext对象
- 在chatter中重构消息处理逻辑,直接从StreamContext获取未读和历史消息
- 移除批量消息处理功能,改为单次StreamContext处理
- 在message_manager中简化消息处理流程,直接传递StreamContext对象
- 添加未读消息清理机制,防止异常情况下消息一直未读

同时优化兴趣度评分系统的参数:
- 调整回复阈值从0.55到0.56
- 增加最大不回复次数从15到20
- 提升每次不回复的概率增加从0.01到0.02
- 优化提及奖励从3.0降到1.0
- 调整回复后的不回复计数减少从1到3

BREAKING CHANGE: AFCManager的process_message方法已重命名为process_stream_context,参数从message_data改为context对象
This commit is contained in:
Windpicker-owo
2025-09-18 22:27:29 +08:00
parent ddf0d08fac
commit a2225cad3a
12 changed files with 476 additions and 277 deletions

View File

@@ -49,14 +49,14 @@ class AFCManager:
return self.affinity_flow_chatters[stream_id]
async def process_message(self, stream_id: str, message_data: dict) -> Dict[str, any]:
"""处理消息"""
async def process_stream_context(self, stream_id: str, context) -> Dict[str, any]:
"""处理StreamContext对象"""
try:
# 获取或创建聊天处理器
chatter = self.get_or_create_chatter(stream_id)
# 处理消息
result = await chatter.process_message(message_data)
# 处理StreamContext
result = await chatter.process_stream_context(context)
# 更新统计
self.manager_stats["total_messages_processed"] += 1
@@ -66,20 +66,13 @@ class AFCManager:
return result
except Exception as e:
logger.error(f"处理消息时出错: {e}\n{traceback.format_exc()}")
logger.error(f"处理StreamContext时出错: {e}\n{traceback.format_exc()}")
return {
"success": False,
"error_message": str(e),
"executed_count": 0,
}
async def process_messages_batch(self, stream_id: str, messages_data: List[dict]) -> List[Dict[str, any]]:
"""批量处理消息"""
results = []
for message_data in messages_data:
result = await self.process_message(stream_id, message_data)
results.append(result)
return results
def get_chatter_stats(self, stream_id: str) -> Optional[Dict[str, any]]:
"""获取聊天处理器统计"""

View File

@@ -42,28 +42,31 @@ class AffinityFlowChatter:
}
self.last_activity_time = time.time()
async def process_message(self, message_data: dict) -> Dict[str, any]:
async def process_stream_context(self, context) -> Dict[str, any]:
"""
处理单个消息
处理StreamContext对象
Args:
message_data: 消息数据字典,包含:
- message_info: 消息基本信息
- processed_plain_text: 处理后的纯文本
- context_messages: 上下文消息(历史+未读)
- unread_messages: 未读消息列表
context: StreamContext对象包含聊天流的所有消息信息
Returns:
处理结果字典
"""
try:
# 取未读消息用于兴趣度计算
unread_messages = message_data.get("unread_messages", [])
# 取未读消息和历史消息
unread_messages = context.get_unread_messages()
history_messages = context.get_history_messages()
# 使用增强版规划器处理消息,传递未读消息用于兴趣度计算
# 准备消息数据
message_data = {
"unread_messages": unread_messages,
"history_messages": history_messages
}
# 使用增强版规划器处理消息
actions, target_message = await self.planner.plan(
mode=ChatMode.FOCUS,
unread_messages=unread_messages
message_data=message_data
)
self.stats["plans_created"] += 1
@@ -76,7 +79,7 @@ class AffinityFlowChatter:
# 更新统计
self.stats["messages_processed"] += 1
self.stats["actions_executed"] += execution_result.get("executed_count", 0)
self.stats["successful_executions"] += 1 # TODO:假设成功
self.stats["successful_executions"] += 1
self.last_activity_time = time.time()
result = {
@@ -89,12 +92,12 @@ class AffinityFlowChatter:
**execution_result,
}
logger.info(f"聊天流 {self.stream_id} 消息处理成功: 动作数={result['actions_count']}, 未读消息={result['unread_messages_processed']}")
logger.info(f"聊天流 {self.stream_id} StreamContext处理成功: 动作数={result['actions_count']}, 未读消息={result['unread_messages_processed']}")
return result
except Exception as e:
logger.error(f"亲和力聊天处理器 {self.stream_id} 处理消息时出错: {e}\n{traceback.format_exc()}")
logger.error(f"亲和力聊天处理器 {self.stream_id} 处理StreamContext时出错: {e}\n{traceback.format_exc()}")
self.stats["failed_executions"] += 1
self.last_activity_time = time.time()

View File

@@ -30,13 +30,13 @@ class InterestScoringSystem:
}
# 评分阈值
self.reply_threshold = 0.55 # 默认回复阈值
self.reply_threshold = 0.56 # 默认回复阈值
self.mention_threshold = 0.3 # 提及阈值
# 连续不回复概率提升
self.no_reply_count = 0
self.max_no_reply_count = 15
self.probability_boost_per_no_reply = 0.01 # 每次不回复增加15%概率
self.max_no_reply_count = 20
self.probability_boost_per_no_reply = 0.02 # 每次不回复增加5%概率
# 用户关系数据
self.user_relationships: Dict[str, float] = {} # user_id -> relationship_score
@@ -149,7 +149,7 @@ class InterestScoringSystem:
# 返回匹配分数,考虑置信度和匹配标签数量
match_count_bonus = min(len(match_result.matched_tags) * 0.05, 0.3) # 每多匹配一个标签+0.05,最高+0.3
final_score = match_result.overall_score * 1.3 * match_result.confidence + match_count_bonus
final_score = match_result.overall_score * 1.15 * match_result.confidence + match_count_bonus
logger.debug(f"⚖️ 最终分数计算: 总分({match_result.overall_score:.3f}) × 1.3 × 置信度({match_result.confidence:.3f}) + 标签数量奖励({match_count_bonus:.3f}) = {final_score:.3f}")
return final_score
else:
@@ -227,7 +227,7 @@ class InterestScoringSystem:
return 0.0
if msg.is_mentioned or (bot_nickname and bot_nickname in msg.processed_plain_text):
return 3.0
return 1.0
return 0.0
@@ -273,7 +273,7 @@ class InterestScoringSystem:
old_count = self.no_reply_count
if did_reply:
self.no_reply_count = max(0, self.no_reply_count - 1)
self.no_reply_count = max(0, self.no_reply_count - 3)
action = "✅ reply动作可用"
else:
self.no_reply_count += 1

View File

@@ -433,9 +433,9 @@ class BotInterestManager:
low_similarity_count = 0
# 分级相似度阈值
high_threshold = 0.5
medium_threshold = 0.3
low_threshold = 0.15
high_threshold = 0.55
medium_threshold = 0.47
low_threshold = 0.3
logger.debug(f"🔍 使用分级相似度阈值: 高={high_threshold}, 中={medium_threshold}, 低={low_threshold}")
@@ -449,7 +449,7 @@ class BotInterestManager:
# 根据相似度等级应用不同的加成
if similarity > high_threshold:
# 高相似度:强加成
enhanced_score = weighted_score * 1.5
enhanced_score = weighted_score * 1.8
match_count += 1
high_similarity_count += 1
result.add_match(tag.tag_name, enhanced_score, [tag.tag_name])
@@ -457,7 +457,7 @@ class BotInterestManager:
elif similarity > medium_threshold:
# 中相似度:中等加成
enhanced_score = weighted_score * 1.2
enhanced_score = weighted_score * 1.4
match_count += 1
medium_similarity_count += 1
result.add_match(tag.tag_name, enhanced_score, [tag.tag_name])
@@ -465,7 +465,7 @@ class BotInterestManager:
elif similarity > low_threshold:
# 低相似度:轻微加成
enhanced_score = weighted_score * 1.05
enhanced_score = weighted_score * 1.15
match_count += 1
low_similarity_count += 1
result.add_match(tag.tag_name, enhanced_score, [tag.tag_name])

View File

@@ -5,13 +5,16 @@
import asyncio
import time
import traceback
from typing import Dict, Optional, Any
from typing import Dict, Optional, Any, TYPE_CHECKING
from src.common.logger import get_logger
from src.common.data_models.database_data_model import DatabaseMessages
from src.common.data_models.message_manager_data_model import StreamContext, MessageManagerStats, StreamStats
from src.chat.affinity_flow.afc_manager import afc_manager
if TYPE_CHECKING:
from src.common.data_models.message_manager_data_model import StreamContext
logger = get_logger("message_manager")
@@ -120,44 +123,21 @@ class MessageManager:
logger.debug(f"开始处理聊天流 {stream_id}{len(unread_messages)} 条未读消息")
# 获取上下文消息
context_messages = context.get_context_messages()
# 批量处理消息
messages_data = []
for msg in unread_messages:
message_data = {
"message_info": {
"platform": msg.user_info.platform,
"user_info": {
"user_id": msg.user_info.user_id,
"user_nickname": msg.user_info.user_nickname,
"user_cardname": msg.user_info.user_cardname,
"platform": msg.user_info.platform
},
"group_info": {
"group_id": msg.group_info.group_id,
"group_name": msg.group_info.group_name,
"group_platform": msg.group_info.group_platform
} if msg.group_info else None
},
"processed_plain_text": msg.processed_plain_text,
"context_messages": [ctx_msg.flatten() for ctx_msg in context_messages],
"unread_messages": unread_messages # 传递原始对象而不是字典
}
messages_data.append(message_data)
# 发送到AFC处理器
if messages_data:
results = await afc_manager.process_messages_batch(stream_id, messages_data)
# 处理结果,标记消息为已读
for i, result in enumerate(results):
if result.get("success", False):
msg_id = unread_messages[i].message_id
context.mark_message_as_read(msg_id)
self.stats.total_processed_messages += 1
logger.debug(f"消息 {msg_id} 处理完成,标记为已读")
# 直接使用StreamContext对象进行处理
if unread_messages:
try:
# 发送到AFC处理器传递StreamContext对象
results = await afc_manager.process_stream_context(stream_id, context)
# 处理结果,标记消息为已读
if results.get("success", False):
self._clear_all_unread_messages(context)
except Exception as e:
# 发生异常时,清除所有未读消息,防止意外关闭等导致消息一直未读
logger.error(f"处理聊天流 {stream_id} 时发生异常,将清除所有未读消息: {e}")
self._clear_all_unread_messages(context)
raise
logger.debug(f"聊天流 {stream_id} 消息处理完成")
@@ -227,6 +207,23 @@ class MessageManager:
del self.stream_contexts[stream_id]
logger.info(f"清理不活跃聊天流: {stream_id}")
def _clear_all_unread_messages(self, context: StreamContext):
"""清除指定上下文中的所有未读消息,防止意外情况导致消息一直未读"""
unread_messages = context.get_unread_messages()
if not unread_messages:
return
logger.warning(f"正在清除 {len(unread_messages)} 条未读消息")
# 将所有未读消息标记为已读并移动到历史记录
for msg in unread_messages[:]: # 使用切片复制避免迭代时修改列表
try:
context.mark_message_as_read(msg.message_id)
self.stats.total_processed_messages += 1
logger.info(f"强制清除消息 {msg.message_id},标记为已读")
except Exception as e:
logger.error(f"清除消息 {msg.message_id} 时出错: {e}")
# 创建全局消息管理器实例
message_manager = MessageManager()

View File

@@ -1,6 +1,7 @@
"""
PlanFilter: 接收 Plan 对象,根据不同模式的逻辑进行筛选,决定最终要执行的动作。
"""
import orjson
import time
import traceback
@@ -33,9 +34,7 @@ class PlanFilter:
"""
def __init__(self):
self.planner_llm = LLMRequest(
model_set=model_config.model_task_config.planner, request_type="planner"
)
self.planner_llm = LLMRequest(model_set=model_config.model_task_config.planner, request_type="planner")
self.last_obs_time_mark = 0.0
async def filter(self, reply_not_available: bool, plan: Plan) -> Plan:
@@ -55,9 +54,9 @@ class PlanFilter:
try:
parsed_json = orjson.loads(repair_json(llm_content))
except orjson.JSONDecodeError:
prased_json = {"action": "no_action", "reason": "返回内容无法解析为JSON"}
parsed_json = {"action": "no_action", "reason": "返回内容无法解析为JSON"}
logger.debug(f"墨墨在这里加了日志 -> 解析后的 JSON: {parsed_json}")
if "reply" in plan.available_actions and reply_not_available:
# 如果reply动作不可用但llm返回的仍然有reply则改为no_reply
if isinstance(parsed_json, dict) and parsed_json.get("action") == "reply":
@@ -86,28 +85,18 @@ class PlanFilter:
if action_type in reply_action_types:
if not reply_action_added:
final_actions.extend(
await self._parse_single_action(
item, used_message_id_list, plan
)
)
final_actions.extend(await self._parse_single_action(item, used_message_id_list, plan))
reply_action_added = True
else:
# 非回复类动作直接添加
final_actions.extend(
await self._parse_single_action(
item, used_message_id_list, plan
)
)
final_actions.extend(await self._parse_single_action(item, used_message_id_list, plan))
plan.decided_actions = self._filter_no_actions(final_actions)
except Exception as e:
logger.error(f"筛选 Plan 时出错: {e}\n{traceback.format_exc()}")
plan.decided_actions = [
ActionPlannerInfo(action_type="no_action", reasoning=f"筛选时出错: {e}")
]
plan.decided_actions = [ActionPlannerInfo(action_type="no_action", reasoning=f"筛选时出错: {e}")]
logger.debug(f"墨墨在这里加了日志 -> filter 出口 decided_actions: {plan.decided_actions}")
return plan
@@ -136,7 +125,7 @@ class PlanFilter:
if plan.mode == ChatMode.PROACTIVE:
long_term_memory_block = await self._get_long_term_memory_context()
chat_content_block, message_id_list = build_readable_messages_with_id(
messages=[msg.flatten() for msg in plan.chat_history],
timestamp_mode="normal",
@@ -165,7 +154,13 @@ class PlanFilter:
)
return prompt, message_id_list
chat_content_block, message_id_list = build_readable_messages_with_id(
# 构建已读/未读历史消息
read_history_block, unread_history_block, message_id_list = await self._build_read_unread_history_blocks(
plan
)
# 为了兼容性保留原有的chat_content_block
chat_content_block, _ = build_readable_messages_with_id(
messages=[msg.flatten() for msg in plan.chat_history],
timestamp_mode="normal",
read_mark=self.last_obs_time_mark,
@@ -232,8 +227,8 @@ class PlanFilter:
custom_prompt_block = ""
if global_config.custom_prompt.planner_custom_prompt_content:
custom_prompt_block = global_config.custom_prompt.planner_custom_prompt_content
users_in_chat_str = "" # TODO: Re-implement user list fetching if needed
users_in_chat_str = "" # TODO: Re-implement user list fetching if needed
planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt")
prompt = planner_prompt_template.format(
@@ -241,7 +236,8 @@ class PlanFilter:
mood_block=mood_block,
time_block=time_block,
chat_context_description=chat_context_description,
chat_content_block=chat_content_block,
read_history_block=read_history_block,
unread_history_block=unread_history_block,
actions_before_now_block=actions_before_now_block,
mentioned_bonus=mentioned_bonus,
no_action_block=no_action_block,
@@ -250,7 +246,7 @@ class PlanFilter:
identity_block=identity_block,
custom_prompt_block=custom_prompt_block,
bot_name=bot_name,
users_in_chat=users_in_chat_str
users_in_chat=users_in_chat_str,
)
return prompt, message_id_list
except Exception as e:
@@ -258,6 +254,114 @@ class PlanFilter:
logger.error(traceback.format_exc())
return "构建 Planner Prompt 时出错", []
async def _build_read_unread_history_blocks(self, plan: Plan) -> tuple[str, str, list]:
"""构建已读/未读历史消息块"""
try:
# 从message_manager获取真实的已读/未读消息
from src.chat.message_manager.message_manager import message_manager
from src.chat.utils.utils import assign_message_ids
# 获取聊天流的上下文
stream_context = message_manager.stream_contexts.get(plan.chat_id)
if not stream_context:
# 如果没有找到对应的上下文,使用兼容性处理
return await self._fallback_build_history_blocks(plan)
# 获取真正的已读和未读消息
read_messages = stream_context.history_messages # 已读消息存储在history_messages中
unread_messages = stream_context.get_unread_messages() # 获取未读消息
# 构建已读历史消息块
if read_messages:
read_content, read_ids = build_readable_messages_with_id(
messages=[msg.flatten() for msg in read_messages[-50:]], # 限制数量
timestamp_mode="normal_no_YMD",
truncate=False,
show_actions=False,
)
read_history_block = f"{read_content}"
else:
read_history_block = "暂无已读历史消息"
# 构建未读历史消息块(包含兴趣度)
if unread_messages:
# 扁平化未读消息用于计算兴趣度和格式化
flattened_unread = [msg.flatten() for msg in unread_messages]
# 尝试获取兴趣度评分(返回以真实 message_id 为键的字典)
interest_scores = await self._get_interest_scores_for_messages(flattened_unread)
# 为未读消息分配短 id保持与 build_readable_messages_with_id 的一致结构)
message_id_list = assign_message_ids(flattened_unread)
unread_lines = []
for idx, msg in enumerate(flattened_unread):
mapped = message_id_list[idx]
synthetic_id = mapped.get("id")
original_msg_id = msg.get("message_id") or msg.get("id")
msg_time = time.strftime("%H:%M:%S", time.localtime(msg.get("time", time.time())))
msg_content = msg.get("processed_plain_text", "")
# 添加兴趣度信息
interest_score = interest_scores.get(original_msg_id, 0.0)
interest_text = f" [兴趣度: {interest_score:.3f}]" if interest_score > 0 else ""
# 在未读行中显示合成id方便 planner 返回时使用
unread_lines.append(f"{msg_time} {synthetic_id}: {msg_content}{interest_text}")
unread_history_block = "\n".join(unread_lines)
else:
unread_history_block = "暂无未读历史消息"
return read_history_block, unread_history_block, message_id_list
except Exception as e:
logger.error(f"构建已读/未读历史消息块时出错: {e}")
return "构建已读历史消息时出错", "构建未读历史消息时出错", []
async def _get_interest_scores_for_messages(self, messages: List[dict]) -> dict[str, float]:
"""为消息获取兴趣度评分"""
interest_scores = {}
try:
from src.chat.affinity_flow.interest_scoring import interest_scoring_system
from src.common.data_models.database_data_model import DatabaseMessages
# 转换消息格式
db_messages = []
for msg_dict in messages:
try:
db_msg = DatabaseMessages(
message_id=msg_dict.get("message_id", ""),
time=msg_dict.get("time", time.time()),
chat_id=msg_dict.get("chat_id", ""),
processed_plain_text=msg_dict.get("processed_plain_text", ""),
user_id=msg_dict.get("user_id", ""),
user_nickname=msg_dict.get("user_nickname", ""),
user_platform=msg_dict.get("platform", "qq"),
chat_info_group_id=msg_dict.get("group_id", ""),
chat_info_group_name=msg_dict.get("group_name", ""),
chat_info_group_platform=msg_dict.get("platform", "qq"),
)
db_messages.append(db_msg)
except Exception as e:
logger.warning(f"转换消息格式失败: {e}")
continue
# 计算兴趣度评分
if db_messages:
bot_nickname = global_config.bot.nickname or "麦麦"
scores = await interest_scoring_system.calculate_interest_scores(db_messages, bot_nickname)
# 构建兴趣度字典
for score in scores:
interest_scores[score.message_id] = score.total_score
except Exception as e:
logger.warning(f"获取兴趣度评分失败: {e}")
return interest_scores
async def _parse_single_action(
self, action_json: dict, message_id_list: list, plan: Plan
) -> List[ActionPlannerInfo]:
@@ -281,13 +385,18 @@ class PlanFilter:
else:
# 如果找不到目标消息对于reply动作来说这是必需的应该记录警告
if action == "reply":
logger.warning(f"reply动作找不到目标消息target_message_id: {action_json.get('target_message_id')}")
logger.warning(
f"reply动作找不到目标消息target_message_id: {action_json.get('target_message_id')}"
)
# 将reply动作改为no_action避免后续执行时出错
action = "no_action"
reasoning = f"找不到目标消息进行回复。原始理由: {reasoning}"
available_action_names = list(plan.available_actions.keys())
if action not in ["no_action", "no_reply", "reply", "do_nothing", "proactive_reply"] and action not in available_action_names:
if (
action not in ["no_action", "no_reply", "reply", "do_nothing", "proactive_reply"]
and action not in available_action_names
):
reasoning = f"LLM 返回了当前不可用的动作 '{action}'。原始理由: {reasoning}"
action = "no_action"
@@ -310,9 +419,7 @@ class PlanFilter:
)
return parsed_actions
def _filter_no_actions(
self, action_list: List[ActionPlannerInfo]
) -> List[ActionPlannerInfo]:
def _filter_no_actions(self, action_list: List[ActionPlannerInfo]) -> List[ActionPlannerInfo]:
non_no_actions = [a for a in action_list if a.action_type not in ["no_action", "no_reply"]]
if non_no_actions:
return non_no_actions
@@ -361,11 +468,47 @@ class PlanFilter:
return action_options_block
def _find_message_by_id(self, message_id: str, message_id_list: list) -> Optional[Dict[str, Any]]:
# 兼容多种 message_id 格式数字、m123、buffered-xxxx
# 如果是纯数字,补上 m 前缀以兼容旧格式
candidate_ids = {message_id}
if message_id.isdigit():
message_id = f"m{message_id}"
candidate_ids.add(f"m{message_id}")
# 如果是 m 开头且后面是数字,尝试去掉 m 前缀的数字形式
if message_id.startswith("m") and message_id[1:].isdigit():
candidate_ids.add(message_id[1:])
# 逐项匹配 message_id_list每项可能为 {'id':..., 'message':...}
for item in message_id_list:
if item.get("id") == message_id:
# 支持 message_id_list 中直接是字符串/ID 的情形
if isinstance(item, str):
if item in candidate_ids:
# 没有 message 对象返回None
return None
continue
if not isinstance(item, dict):
continue
item_id = item.get("id")
# 直接匹配分配的短 id
if item_id and item_id in candidate_ids:
return item.get("message")
# 有时 message 存储里会有原始的 message_id 字段(如 buffered-xxxx
message_obj = item.get("message")
if isinstance(message_obj, dict):
orig_mid = message_obj.get("message_id") or message_obj.get("id")
if orig_mid and orig_mid in candidate_ids:
return message_obj
# 作为兜底,尝试在 message_id_list 中找到 message.message_id 匹配
for item in message_id_list:
if isinstance(item, dict) and isinstance(item.get("message"), dict):
mid = item["message"].get("message_id") or item["message"].get("id")
if mid == message_id:
return item["message"]
return None
def _get_latest_message(self, message_id_list: list) -> Optional[Dict[str, Any]]:

View File

@@ -64,13 +64,15 @@ class ActionPlanner:
"other_actions_executed": 0,
}
async def plan(self, mode: ChatMode = ChatMode.FOCUS, unread_messages: List[Dict] = None) -> Tuple[List[Dict], Optional[Dict]]:
async def plan(self, mode: ChatMode = ChatMode.FOCUS, message_data: dict = None) -> Tuple[List[Dict], Optional[Dict]]:
"""
执行完整的增强版规划流程。
Args:
mode (ChatMode): 当前的聊天模式,默认为 FOCUS。
unread_messages (List[Dict]): 未读消息列表,用于兴趣度计算。
message_data (dict): 消息数据字典,包含:
- unread_messages: 未读消息列表
- history_messages: 历史消息列表(可选)
Returns:
Tuple[List[Dict], Optional[Dict]]: 一个元组,包含:
@@ -78,6 +80,8 @@ class ActionPlanner:
- final_target_message_dict (Optional[Dict]): 最终的目标消息(字典格式)。
"""
try:
# 提取未读消息用于兴趣度计算
unread_messages = message_data.get("unread_messages", []) if message_data else []
self.planner_stats["total_plans"] += 1
return await self._enhanced_plan_flow(mode, unread_messages or [])
@@ -118,12 +122,13 @@ class ActionPlanner:
logger.info(f"❌ 兴趣度不足阈值的80%: {score:.3f} < {threshold_requirement:.3f}直接返回no_action")
logger.info(f"📊 最低要求: 阈值({base_threshold:.3f}) × 0.8 = {threshold_requirement:.3f}")
# 直接返回 no_action
no_action = {
"action_type": "no_action",
"reason": f"兴趣度评分 {score:.3f} 未达阈值80% {threshold_requirement:.3f}",
"action_data": {},
"action_message": None,
}
from src.common.data_models.info_data_model import ActionPlannerInfo
no_action = ActionPlannerInfo(
action_type="no_action",
reasoning=f"兴趣度评分 {score:.3f} 未达阈值80% {threshold_requirement:.3f}",
action_data={},
action_message=None,
)
filtered_plan = initial_plan
filtered_plan.decided_actions = [no_action]
else:

View File

@@ -4,6 +4,7 @@
通过将提示词与代码逻辑分离,可以更方便地对模型的行为进行迭代和优化,
而无需修改核心代码。
"""
from src.chat.utils.prompt import Prompt
@@ -25,7 +26,12 @@ def init_prompts():
{users_in_chat}
{custom_prompt_block}
{chat_context_description},以下是具体的聊天内容。
{chat_content_block}
## 📜 已读历史消息(仅供参考)
{read_history_block}
## 📬 未读历史消息(动作执行对象)
{unread_history_block}
{moderation_prompt}
@@ -35,10 +41,13 @@ def init_prompts():
2. **辅助动作 (可选)**: 这是为了增强表达效果的附加动作,例如 `emoji`(发送表情包)或 `poke_user`(戳一戳)。
**决策流程:**
1. 首先,决定是否要进行 `reply`(如果有)。
2. 然后,评估当前的对话气氛和用户情绪,判断是否需要一个**辅助动作**来让你的回应更生动、更符合你的性格。
3. 如果需要,选择一个最合适的辅助动作与 `reply`(如果有) 组合
4. 如果用户明确要求了某个动作,请务必优先满足
1. **重要:已读历史消息仅作为当前聊天情景的参考,帮助你理解对话上下文。**
2. **重要:所有动作的执行对象只能是未读历史消息中的消息,不能对已读消息执行动作。**
3. 在未读历史消息中,优先对兴趣值高的消息做出动作(兴趣值标注在消息末尾)
4. 首先,决定是否要对未读消息进行 `reply`(如果有)
5. 然后,评估当前的对话气氛和用户情绪,判断是否需要一个**辅助动作**来让你的回应更生动、更符合你的性格。
6. 如果需要,选择一个最合适的辅助动作与 `reply`(如果有) 组合。
7. 如果用户明确要求了某个动作,请务必优先满足。
**如果可选动作中没有reply请不要使用**
@@ -77,7 +86,9 @@ def init_prompts():
]
**重要规则:**
当 `reply` 和 `emoji` 动作同时被选择时,`emoji` 动作的 `reason` 字段必须包含 `reply` 动作最终生成的回复文本内容。你需要将 `<TEXT>` 占位符替换为 `reply` 动作的 `reason` 字段内容,以确保表情包的选择与回复文本高度相关。
1. 当 `reply` 和 `emoji` 动作同时被选择时,`emoji` 动作的 `reason` 字段必须包含 `reply` 动作最终生成的回复文本内容。你需要将 `<TEXT>` 占位符替换为 `reply` 动作的 `reason` 字段内容,以确保表情包的选择与回复文本高度相关。
2. **动作执行限制所有动作的target_message_id必须是未读历史消息中的消息ID(消息ID格式:m123)。**
3. **兴趣度优先:在多个未读消息中,优先选择兴趣值高的消息进行回复。**
不要输出markdown格式```json等内容直接输出且仅包含 JSON 列表内容:
""",
@@ -161,4 +172,4 @@ def init_prompts():
# 在模块加载时自动调用,完成提示词的注册。
init_prompts()
init_prompts()

View File

@@ -83,13 +83,13 @@ def init_prompt():
- {schedule_block}
## 历史记录
### 当前群聊中的所有人的聊天记录:
{background_dialogue_prompt}
### 📜 已读历史消息(仅供参考)
{read_history_prompt}
{cross_context_block}
### 当前群聊中正在与你对话的聊天记录
{core_dialogue_prompt}
### 📬 未读历史消息(动作执行对象)
{unread_history_prompt}
## 表达方式
- *你需要参考你的回复风格:*
@@ -119,6 +119,11 @@ def init_prompt():
## 规则
{safety_guidelines_block}
**重要提醒:**
- **已读历史消息仅作为当前聊天情景的参考**
- **动作执行对象只能是未读历史消息中的消息**
- **请优先对兴趣值高的消息做出回复**(兴趣度标注在未读消息末尾)
在回应之前,首先分析消息的针对性:
1. **直接针对你**@你、回复你、明确询问你 → 必须回应
2. **间接相关**:涉及你感兴趣的话题但未直接问你 → 谨慎参与
@@ -657,78 +662,195 @@ class DefaultReplyer:
duration = end_time - start_time
return name, result, duration
def build_s4u_chat_history_prompts(
self, message_list_before_now: List[Dict[str, Any]], target_user_id: str, sender: str
async def build_s4u_chat_history_prompts(
self, message_list_before_now: List[Dict[str, Any]], target_user_id: str, sender: str, chat_id: str
) -> Tuple[str, str]:
"""
构建 s4u 风格的分离对话 prompt
构建 s4u 风格的已读/未读历史消息 prompt
Args:
message_list_before_now: 历史消息列表
target_user_id: 目标用户ID当前对话对象
sender: 发送者名称
chat_id: 聊天ID
Returns:
Tuple[str, str]: (核心对话prompt, 背景对话prompt)
Tuple[str, str]: (已读历史消息prompt, 未读历史消息prompt)
"""
core_dialogue_list = []
try:
# 从message_manager获取真实的已读/未读消息
from src.chat.message_manager.message_manager import message_manager
# 获取聊天流的上下文
stream_context = message_manager.stream_contexts.get(chat_id)
if stream_context:
# 使用真正的已读和未读消息
read_messages = stream_context.history_messages # 已读消息
unread_messages = stream_context.get_unread_messages() # 未读消息
# 构建已读历史消息 prompt
read_history_prompt = ""
if read_messages:
read_content = build_readable_messages(
[msg.flatten() for msg in read_messages[-50:]], # 限制数量
replace_bot_name=True,
timestamp_mode="normal_no_YMD",
truncate=True,
)
read_history_prompt = f"这是已读历史消息,仅作为当前聊天情景的参考:\n{read_content}"
else:
read_history_prompt = "暂无已读历史消息"
# 构建未读历史消息 prompt包含兴趣度
unread_history_prompt = ""
if unread_messages:
# 尝试获取兴趣度评分
interest_scores = await self._get_interest_scores_for_messages([msg.flatten() for msg in unread_messages])
unread_lines = []
for msg in unread_messages:
msg_id = msg.message_id
msg_time = time.strftime('%H:%M:%S', time.localtime(msg.time))
msg_content = msg.processed_plain_text
# 添加兴趣度信息
interest_score = interest_scores.get(msg_id, 0.0)
interest_text = f" [兴趣度: {interest_score:.3f}]" if interest_score > 0 else ""
unread_lines.append(f"{msg_time}: {msg_content}{interest_text}")
unread_history_prompt_str = "\n".join(unread_lines)
unread_history_prompt = f"这是未读历史消息,包含兴趣度评分,请优先对兴趣值高的消息做出动作:\n{unread_history_prompt_str}"
else:
unread_history_prompt = "暂无未读历史消息"
return read_history_prompt, unread_history_prompt
else:
# 回退到传统方法
return await self._fallback_build_chat_history_prompts(message_list_before_now, target_user_id, sender)
except Exception as e:
logger.warning(f"获取已读/未读历史消息失败,使用回退方法: {e}")
return await self._fallback_build_chat_history_prompts(message_list_before_now, target_user_id, sender)
async def _fallback_build_chat_history_prompts(
self, message_list_before_now: List[Dict[str, Any]], target_user_id: str, sender: str
) -> Tuple[str, str]:
"""
回退的已读/未读历史消息构建方法
"""
# 通过is_read字段分离已读和未读消息
read_messages = []
unread_messages = []
bot_id = str(global_config.bot.qq_account)
# 过滤消息分离bot和目标用户的对话 vs 其他用户的对话
for msg_dict in message_list_before_now:
try:
msg_user_id = str(msg_dict.get("user_id"))
reply_to = msg_dict.get("reply_to", "")
_platform, reply_to_user_id = self._parse_reply_target(reply_to)
if (msg_user_id == bot_id and reply_to_user_id == target_user_id) or msg_user_id == target_user_id:
# bot 和目标用户的对话
core_dialogue_list.append(msg_dict)
if msg_dict.get("is_read", False):
read_messages.append(msg_dict)
else:
unread_messages.append(msg_dict)
except Exception as e:
logger.error(f"处理消息记录时出错: {msg_dict}, 错误: {e}")
# 构建背景对话 prompt
all_dialogue_prompt = ""
if message_list_before_now:
latest_25_msgs = message_list_before_now[-int(global_config.chat.max_context_size) :]
all_dialogue_prompt_str = build_readable_messages(
latest_25_msgs,
# 如果没有is_read字段使用原有的逻辑
if not read_messages and not unread_messages:
# 使用原有的核心对话逻辑
core_dialogue_list = []
for msg_dict in message_list_before_now:
try:
msg_user_id = str(msg_dict.get("user_id"))
reply_to = msg_dict.get("reply_to", "")
_platform, reply_to_user_id = self._parse_reply_target(reply_to)
if (msg_user_id == bot_id and reply_to_user_id == target_user_id) or msg_user_id == target_user_id:
core_dialogue_list.append(msg_dict)
except Exception as e:
logger.error(f"处理消息记录时出错: {msg_dict}, 错误: {e}")
read_messages = [msg for msg in message_list_before_now if msg not in core_dialogue_list]
unread_messages = core_dialogue_list
# 构建已读历史消息 prompt
read_history_prompt = ""
if read_messages:
read_content = build_readable_messages(
read_messages[-50:],
replace_bot_name=True,
timestamp_mode="normal",
timestamp_mode="normal_no_YMD",
truncate=True,
)
all_dialogue_prompt = f"所有用户的发言:\n{all_dialogue_prompt_str}"
read_history_prompt = f"这是已读历史消息,仅作为当前聊天情景的参考:\n{read_content}"
else:
read_history_prompt = "暂无已读历史消息"
# 构建核心对话 prompt
core_dialogue_prompt = ""
if core_dialogue_list:
# 检查最新五条消息中是否包含bot自己说的消息
latest_5_messages = core_dialogue_list[-5:] if len(core_dialogue_list) >= 5 else core_dialogue_list
has_bot_message = any(str(msg.get("user_id")) == bot_id for msg in latest_5_messages)
# 构建未读历史消息 prompt
unread_history_prompt = ""
if unread_messages:
# 尝试获取兴趣度评分
interest_scores = await self._get_interest_scores_for_messages(unread_messages)
# logger.info(f"最新五条消息:{latest_5_messages}")
# logger.info(f"最新五条消息中是否包含bot自己说的消息{has_bot_message}")
unread_lines = []
for msg in unread_messages:
msg_id = msg.get("message_id", "")
msg_time = time.strftime('%H:%M:%S', time.localtime(msg.get("time", time.time())))
msg_content = msg.get("processed_plain_text", "")
# 如果最新五条消息中不包含bot的消息则返回空字符串
if not has_bot_message:
core_dialogue_prompt = ""
else:
core_dialogue_list = core_dialogue_list[-int(global_config.chat.max_context_size * 2) :] # 限制消息数量
# 添加兴趣度信息
interest_score = interest_scores.get(msg_id, 0.0)
interest_text = f" [兴趣度: {interest_score:.3f}]" if interest_score > 0 else ""
core_dialogue_prompt_str = build_readable_messages(
core_dialogue_list,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="normal_no_YMD",
read_mark=0.0,
truncate=True,
show_actions=True,
)
core_dialogue_prompt = f"""--------------------------------
这是你和{sender}的对话,你们正在交流中:
{core_dialogue_prompt_str}
--------------------------------
"""
unread_lines.append(f"{msg_time}: {msg_content}{interest_text}")
return core_dialogue_prompt, all_dialogue_prompt
unread_history_prompt_str = "\n".join(unread_lines)
unread_history_prompt = f"这是未读历史消息,包含兴趣度评分,请优先对兴趣值高的消息做出动作:\n{unread_history_prompt_str}"
else:
unread_history_prompt = "暂无未读历史消息"
return read_history_prompt, unread_history_prompt
async def _get_interest_scores_for_messages(self, messages: List[dict]) -> dict[str, float]:
"""为消息获取兴趣度评分"""
interest_scores = {}
try:
from src.chat.affinity_flow.interest_scoring import interest_scoring_system
from src.common.data_models.database_data_model import DatabaseMessages
# 转换消息格式
db_messages = []
for msg_dict in messages:
try:
db_msg = DatabaseMessages(
message_id=msg_dict.get("message_id", ""),
time=msg_dict.get("time", time.time()),
chat_id=msg_dict.get("chat_id", ""),
processed_plain_text=msg_dict.get("processed_plain_text", ""),
user_id=msg_dict.get("user_id", ""),
user_nickname=msg_dict.get("user_nickname", ""),
user_platform=msg_dict.get("platform", "qq"),
chat_info_group_id=msg_dict.get("group_id", ""),
chat_info_group_name=msg_dict.get("group_name", ""),
chat_info_group_platform=msg_dict.get("platform", "qq"),
)
db_messages.append(db_msg)
except Exception as e:
logger.warning(f"转换消息格式失败: {e}")
continue
# 计算兴趣度评分
if db_messages:
bot_nickname = global_config.bot.nickname or "麦麦"
scores = await interest_scoring_system.calculate_interest_scores(db_messages, bot_nickname)
# 构建兴趣度字典
for score in scores:
interest_scores[score.message_id] = score.total_score
except Exception as e:
logger.warning(f"获取兴趣度评分失败: {e}")
return interest_scores
def build_mai_think_context(
self,

View File

@@ -441,15 +441,16 @@ class Prompt:
"""构建S4U模式的聊天上下文"""
if not self.parameters.message_list_before_now_long:
return
core_dialogue, background_dialogue = await self._build_s4u_chat_history_prompts(
read_history_prompt, unread_history_prompt = await self._build_s4u_chat_history_prompts(
self.parameters.message_list_before_now_long,
self.parameters.target_user_info.get("user_id") if self.parameters.target_user_info else "",
self.parameters.sender
self.parameters.sender,
self.parameters.chat_id
)
context_data["core_dialogue_prompt"] = core_dialogue
context_data["background_dialogue_prompt"] = background_dialogue
context_data["read_history_prompt"] = read_history_prompt
context_data["unread_history_prompt"] = unread_history_prompt
async def _build_normal_chat_context(self, context_data: Dict[str, Any]) -> None:
"""构建normal模式的聊天上下文"""
@@ -460,62 +461,22 @@ class Prompt:
{self.parameters.chat_talking_prompt_short}"""
async def _build_s4u_chat_history_prompts(
self, message_list_before_now: List[Dict[str, Any]], target_user_id: str, sender: str
self, message_list_before_now: List[Dict[str, Any]], target_user_id: str, sender: str, chat_id: str
) -> Tuple[str, str]:
"""构建S4U风格的分离对话prompt"""
# 实现逻辑与原有SmartPromptBuilder相同
core_dialogue_list = []
bot_id = str(global_config.bot.qq_account)
for msg_dict in message_list_before_now:
try:
msg_user_id = str(msg_dict.get("user_id"))
reply_to = msg_dict.get("reply_to", "")
platform, reply_to_user_id = Prompt.parse_reply_target(reply_to)
if (msg_user_id == bot_id and reply_to_user_id == target_user_id) or msg_user_id == target_user_id:
core_dialogue_list.append(msg_dict)
except Exception as e:
logger.error(f"处理消息记录时出错: {msg_dict}, 错误: {e}")
# 构建背景对话 prompt
all_dialogue_prompt = ""
if message_list_before_now:
latest_25_msgs = message_list_before_now[-int(global_config.chat.max_context_size) :]
all_dialogue_prompt_str = build_readable_messages(
latest_25_msgs,
replace_bot_name=True,
timestamp_mode="normal",
truncate=True,
"""构建S4U风格的已读/未读历史消息prompt"""
try:
# 动态导入default_generator以避免循环导入
from src.plugin_system.apis.generator_api import get_replyer
# 创建临时生成器实例来使用其方法
temp_generator = get_replyer(None, chat_id, request_type="prompt_building")
return await temp_generator.build_s4u_chat_history_prompts(
message_list_before_now, target_user_id, sender, chat_id
)
all_dialogue_prompt = f"所有用户的发言:\n{all_dialogue_prompt_str}"
# 构建核心对话 prompt
core_dialogue_prompt = ""
if core_dialogue_list:
latest_5_messages = core_dialogue_list[-5:] if len(core_dialogue_list) >= 5 else core_dialogue_list
has_bot_message = any(str(msg.get("user_id")) == bot_id for msg in latest_5_messages)
if not has_bot_message:
core_dialogue_prompt = ""
else:
core_dialogue_list = core_dialogue_list[-int(global_config.chat.max_context_size * 2) :]
core_dialogue_prompt_str = build_readable_messages(
core_dialogue_list,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="normal_no_YMD",
read_mark=0.0,
truncate=True,
show_actions=True,
)
core_dialogue_prompt = f"""--------------------------------
这是你和{sender}的对话,你们正在交流中:
{core_dialogue_prompt_str}
--------------------------------
"""
return core_dialogue_prompt, all_dialogue_prompt
except Exception as e:
logger.error(f"构建S4U历史消息prompt失败: {e}")
async def _build_expression_habits(self) -> Dict[str, Any]:
"""构建表达习惯"""
@@ -759,9 +720,9 @@ class Prompt:
"action_descriptions": self.parameters.action_descriptions or context_data.get("action_descriptions", ""),
"sender_name": self.parameters.sender or "未知用户",
"mood_state": self.parameters.mood_prompt or context_data.get("mood_state", ""),
"background_dialogue_prompt": context_data.get("background_dialogue_prompt", ""),
"read_history_prompt": context_data.get("read_history_prompt", ""),
"unread_history_prompt": context_data.get("unread_history_prompt", ""),
"time_block": context_data.get("time_block", ""),
"core_dialogue_prompt": context_data.get("core_dialogue_prompt", ""),
"reply_target_block": context_data.get("reply_target_block", ""),
"reply_style": global_config.personality.reply_style,
"keywords_reaction_prompt": self.parameters.keywords_reaction_prompt or context_data.get("keywords_reaction_prompt", ""),

View File

@@ -1,36 +0,0 @@
from typing import Optional, TYPE_CHECKING
from dataclasses import dataclass, field
from . import BaseDataModel
if TYPE_CHECKING:
from .database_data_model import DatabaseMessages
@dataclass
class MessageAndActionModel(BaseDataModel):
chat_id: str = field(default_factory=str)
time: float = field(default_factory=float)
user_id: str = field(default_factory=str)
user_platform: str = field(default_factory=str)
user_nickname: str = field(default_factory=str)
user_cardname: Optional[str] = None
processed_plain_text: Optional[str] = None
display_message: Optional[str] = None
chat_info_platform: str = field(default_factory=str)
is_action_record: bool = field(default=False)
action_name: Optional[str] = None
@classmethod
def from_DatabaseMessages(cls, message: "DatabaseMessages"):
return cls(
chat_id=message.chat_id,
time=message.time,
user_id=message.user_info.user_id,
user_platform=message.user_info.platform,
user_nickname=message.user_info.user_nickname,
user_cardname=message.user_info.user_cardname,
processed_plain_text=message.processed_plain_text,
display_message=message.display_message,
chat_info_platform=message.chat_info.platform,
)

View File

@@ -49,11 +49,11 @@ class StreamContext(BaseDataModel):
self.unread_messages.remove(msg)
break
def get_context_messages(self, limit: int = 20) -> List["DatabaseMessages"]:
"""获取上下文消息(历史消息+未读消息)"""
def get_history_messages(self, limit: int = 20) -> List["DatabaseMessages"]:
"""获取历史消息"""
# 优先返回最近的历史消息和所有未读消息
recent_history = self.history_messages[-limit:] if len(self.history_messages) > limit else self.history_messages
return recent_history + self.unread_messages
return recent_history
@dataclass