refactor(chat): 重构关系系统并优化消息打断处理机制

- 移除独立的RelationshipConfig,将关系追踪参数整合到AffinityFlowConfig
- 实现消息打断后立即重新处理流程,提升交互响应性
- 优化关系追踪系统,添加概率筛选和超时保护机制
- 改进机器人自引用处理,确保消息内容正确显示
- 增强用户信息提取逻辑,兼容多种消息格式
- 添加异步后台任务处理,避免阻塞主回复流程
- 调整兴趣评分阈值和权重参数,优化消息匹配精度
This commit is contained in:
Windpicker-owo
2025-10-08 22:33:10 +08:00
parent b3ae8e4f1a
commit dd1444cc41
13 changed files with 368 additions and 107 deletions

View File

@@ -380,11 +380,11 @@ class MessageManager:
else:
logger.warning(f"消息打断未能取消任何任务: {chat_stream.stream_id}")
# 增加打断计数并应用afc阈值降低
# 增加打断计数
await chat_stream.context_manager.context.increment_interruption_count()
chat_stream.context_manager.context.apply_interruption_afc_reduction(
global_config.chat.interruption_afc_reduction
)
# 🚀 新增:打断后立即重新进入聊天流程
await self._trigger_immediate_reprocess(chat_stream)
# 检查是否已达到最大次数
if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit:
@@ -393,11 +393,64 @@ class MessageManager:
)
else:
logger.info(
f"聊天流 {chat_stream.stream_id} 已打断,当前打断次数: {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit}, afc阈值调整: {chat_stream.context_manager.context.get_afc_threshold_adjustment()}"
f"聊天流 {chat_stream.stream_id} 已打断并重新进入处理流程,当前打断次数: {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit}"
)
else:
logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f},检测到 {len(all_processing_tasks)} 个任务")
async def _trigger_immediate_reprocess(self, chat_stream: ChatStream):
"""打断后立即重新进入聊天流程"""
try:
stream_id = chat_stream.stream_id
logger.info(f"🚀 打断后立即重新处理聊天流: {stream_id}")
# 等待一小段时间确保当前消息已经添加到未读消息中
await asyncio.sleep(0.1)
# 获取当前的stream context
context = chat_stream.stream_context
# 确保有未读消息需要处理
unread_messages = context.get_unread_messages()
if not unread_messages:
logger.debug(f"💭 聊天流 {stream_id} 没有未读消息,跳过重新处理")
return
logger.info(f"💬 开始重新处理 {len(unread_messages)} 条未读消息: {stream_id}")
# 创建新的处理任务
task = asyncio.create_task(
self.chatter_manager.process_stream_context(stream_id, context),
name=f"reprocess_{stream_id}_{int(time.time())}"
)
# 设置处理任务
self.chatter_manager.set_processing_task(stream_id, task)
# 等待处理完成(使用超时防止无限等待)
try:
result = await asyncio.wait_for(task, timeout=30.0)
success = result.get("success", False)
actions_count = result.get("actions_count", 0)
if success:
logger.info(f"✅ 聊天流 {stream_id} 重新处理成功: 执行了 {actions_count} 个动作")
else:
logger.warning(f"❌ 聊天流 {stream_id} 重新处理失败")
except asyncio.TimeoutError:
logger.warning(f"⏰ 聊天流 {stream_id} 重新处理超时")
if not task.done():
task.cancel()
except Exception as e:
logger.error(f"💥 聊天流 {stream_id} 重新处理出错: {e}")
if not task.done():
task.cancel()
except Exception as e:
logger.error(f"🚨 触发重新处理时出错: {e}")
async def clear_all_unread_messages(self, stream_id: str):
"""清除指定上下文中的所有未读消息,在消息处理完成后调用"""
try: