refactor(chat): 重构消息处理流程引入缓冲队列机制

通过引入消息缓冲队列,解决了高频消息场景下的竞态条件和消息丢失问题。
新的处理机制将消息暂存于缓冲区,仅在流处理空闲时才释放到未读队列,
确保了消息处理的原子性和一致性。

核心变更:
- 为每个聊天流创建独立的消息缓冲队列
- 实时跟踪流的处理状态,避免并发访问冲突
- 在处理开始前和完成后自动刷新缓冲区
- 仅在成功执行后清空未读消息,失败时保留消息
- 增加对取消任务和异常情况的容错处理
- 集成缓存统计和监控功能

此优化显著提升了消息处理的可靠性和性能表现。
This commit is contained in:
Windpicker-owo
2025-10-07 16:27:33 +08:00
parent d52ebe20c4
commit 2448f83373
4 changed files with 256 additions and 11 deletions

View File

@@ -110,7 +110,23 @@ class ChatterManager:
self.stats["streams_processed"] += 1
try:
result = await self.instances[stream_id].execute(context)
self.stats["successful_executions"] += 1
# 检查执行结果是否真正成功
success = result.get("success", False)
if success:
self.stats["successful_executions"] += 1
# 只有真正成功时才清空未读消息
try:
from src.chat.message_manager.message_manager import message_manager
await message_manager.clear_stream_unread_messages(stream_id)
logger.debug(f"{stream_id} 处理成功,已清空未读消息")
except Exception as clear_e:
logger.error(f"清除流 {stream_id} 未读消息时发生错误: {clear_e}")
else:
self.stats["failed_executions"] += 1
logger.warning(f"{stream_id} 处理失败,不清空未读消息")
# 从 mood_manager 获取最新的 chat_stream 并同步回 StreamContext
try:
@@ -124,19 +140,14 @@ class ChatterManager:
logger.error(f"同步 chat_stream 回 StreamContext 失败: {sync_e}")
# 记录处理结果
success = result.get("success", False)
actions_count = result.get("actions_count", 0)
logger.debug(f"{stream_id} 处理完成: 成功={success}, 动作数={actions_count}")
# 在处理完成后,清除该流的未读消息
try:
from src.chat.message_manager.message_manager import message_manager
await message_manager.clear_stream_unread_messages(stream_id)
except Exception as clear_e:
logger.error(f"清除流 {stream_id} 未读消息时发生错误: {clear_e}")
return result
except asyncio.CancelledError:
self.stats["failed_executions"] += 1
logger.info(f"{stream_id} 处理被取消,不清空未读消息")
raise
except Exception as e:
self.stats["failed_executions"] += 1
logger.error(f"处理流 {stream_id} 时发生错误: {e}")