feat(affinity-flow): 重构消息处理以使用StreamContext对象

重构AFC消息处理系统,将基于字典的消息数据传递改为直接使用StreamContext对象。主要变更包括:

- 修改AFCManager的process_message方法为process_stream_context,直接接收StreamContext对象
- 在chatter中重构消息处理逻辑,直接从StreamContext获取未读和历史消息
- 移除批量消息处理功能,改为单次StreamContext处理
- 在message_manager中简化消息处理流程,直接传递StreamContext对象
- 添加未读消息清理机制,防止异常情况下消息一直未读

同时优化兴趣度评分系统的参数:
- 调整回复阈值从0.55到0.56
- 增加最大不回复次数从15到20
- 提升每次不回复的概率增加从0.01到0.02
- 优化提及奖励从3.0降到1.0
- 调整回复后的不回复计数减少从1到3

BREAKING CHANGE: AFCManager的process_message方法已重命名为process_stream_context,参数从message_data改为context对象
This commit is contained in:
Windpicker-owo
2025-09-18 22:27:29 +08:00
parent 9a2320944b
commit 3193927a76
11 changed files with 487 additions and 243 deletions

View File

@@ -5,13 +5,16 @@
import asyncio
import time
import traceback
from typing import Dict, Optional, Any
from typing import Dict, Optional, Any, TYPE_CHECKING
from src.common.logger import get_logger
from src.common.data_models.database_data_model import DatabaseMessages
from src.common.data_models.message_manager_data_model import StreamContext, MessageManagerStats, StreamStats
from src.chat.affinity_flow.afc_manager import afc_manager
if TYPE_CHECKING:
from src.common.data_models.message_manager_data_model import StreamContext
logger = get_logger("message_manager")
@@ -120,44 +123,21 @@ class MessageManager:
logger.debug(f"开始处理聊天流 {stream_id}{len(unread_messages)} 条未读消息")
# 获取上下文消息
context_messages = context.get_context_messages()
# 批量处理消息
messages_data = []
for msg in unread_messages:
message_data = {
"message_info": {
"platform": msg.user_info.platform,
"user_info": {
"user_id": msg.user_info.user_id,
"user_nickname": msg.user_info.user_nickname,
"user_cardname": msg.user_info.user_cardname,
"platform": msg.user_info.platform
},
"group_info": {
"group_id": msg.group_info.group_id,
"group_name": msg.group_info.group_name,
"group_platform": msg.group_info.group_platform
} if msg.group_info else None
},
"processed_plain_text": msg.processed_plain_text,
"context_messages": [ctx_msg.flatten() for ctx_msg in context_messages],
"unread_messages": unread_messages # 传递原始对象而不是字典
}
messages_data.append(message_data)
# 发送到AFC处理器
if messages_data:
results = await afc_manager.process_messages_batch(stream_id, messages_data)
# 处理结果,标记消息为已读
for i, result in enumerate(results):
if result.get("success", False):
msg_id = unread_messages[i].message_id
context.mark_message_as_read(msg_id)
self.stats.total_processed_messages += 1
logger.debug(f"消息 {msg_id} 处理完成,标记为已读")
# 直接使用StreamContext对象进行处理
if unread_messages:
try:
# 发送到AFC处理器传递StreamContext对象
results = await afc_manager.process_stream_context(stream_id, context)
# 处理结果,标记消息为已读
if results.get("success", False):
self._clear_all_unread_messages(context)
except Exception as e:
# 发生异常时,清除所有未读消息,防止意外关闭等导致消息一直未读
logger.error(f"处理聊天流 {stream_id} 时发生异常,将清除所有未读消息: {e}")
self._clear_all_unread_messages(context)
raise
logger.debug(f"聊天流 {stream_id} 消息处理完成")
@@ -227,6 +207,23 @@ class MessageManager:
del self.stream_contexts[stream_id]
logger.info(f"清理不活跃聊天流: {stream_id}")
def _clear_all_unread_messages(self, context: StreamContext):
"""清除指定上下文中的所有未读消息,防止意外情况导致消息一直未读"""
unread_messages = context.get_unread_messages()
if not unread_messages:
return
logger.warning(f"正在清除 {len(unread_messages)} 条未读消息")
# 将所有未读消息标记为已读并移动到历史记录
for msg in unread_messages[:]: # 使用切片复制避免迭代时修改列表
try:
context.mark_message_as_read(msg.message_id)
self.stats.total_processed_messages += 1
logger.info(f"强制清除消息 {msg.message_id},标记为已读")
except Exception as e:
logger.error(f"清除消息 {msg.message_id} 时出错: {e}")
# 创建全局消息管理器实例
message_manager = MessageManager()