refactor(chat): 重构消息处理流程引入缓冲队列机制
通过引入消息缓冲队列,解决了高频消息场景下的竞态条件和消息丢失问题。 新的处理机制将消息暂存于缓冲区,仅在流处理空闲时才释放到未读队列, 确保了消息处理的原子性和一致性。 核心变更: - 为每个聊天流创建独立的消息缓冲队列 - 实时跟踪流的处理状态,避免并发访问冲突 - 在处理开始前和完成后自动刷新缓冲区 - 仅在成功执行后清空未读消息,失败时保留消息 - 增加对取消任务和异常情况的容错处理 - 集成缓存统计和监控功能 此优化显著提升了消息处理的可靠性和性能表现。
This commit is contained in:
@@ -110,7 +110,23 @@ class ChatterManager:
|
||||
self.stats["streams_processed"] += 1
|
||||
try:
|
||||
result = await self.instances[stream_id].execute(context)
|
||||
self.stats["successful_executions"] += 1
|
||||
|
||||
# 检查执行结果是否真正成功
|
||||
success = result.get("success", False)
|
||||
|
||||
if success:
|
||||
self.stats["successful_executions"] += 1
|
||||
|
||||
# 只有真正成功时才清空未读消息
|
||||
try:
|
||||
from src.chat.message_manager.message_manager import message_manager
|
||||
await message_manager.clear_stream_unread_messages(stream_id)
|
||||
logger.debug(f"流 {stream_id} 处理成功,已清空未读消息")
|
||||
except Exception as clear_e:
|
||||
logger.error(f"清除流 {stream_id} 未读消息时发生错误: {clear_e}")
|
||||
else:
|
||||
self.stats["failed_executions"] += 1
|
||||
logger.warning(f"流 {stream_id} 处理失败,不清空未读消息")
|
||||
|
||||
# 从 mood_manager 获取最新的 chat_stream 并同步回 StreamContext
|
||||
try:
|
||||
@@ -124,19 +140,14 @@ class ChatterManager:
|
||||
logger.error(f"同步 chat_stream 回 StreamContext 失败: {sync_e}")
|
||||
|
||||
# 记录处理结果
|
||||
success = result.get("success", False)
|
||||
actions_count = result.get("actions_count", 0)
|
||||
logger.debug(f"流 {stream_id} 处理完成: 成功={success}, 动作数={actions_count}")
|
||||
|
||||
# 在处理完成后,清除该流的未读消息
|
||||
try:
|
||||
from src.chat.message_manager.message_manager import message_manager
|
||||
|
||||
await message_manager.clear_stream_unread_messages(stream_id)
|
||||
except Exception as clear_e:
|
||||
logger.error(f"清除流 {stream_id} 未读消息时发生错误: {clear_e}")
|
||||
|
||||
return result
|
||||
except asyncio.CancelledError:
|
||||
self.stats["failed_executions"] += 1
|
||||
logger.info(f"流 {stream_id} 处理被取消,不清空未读消息")
|
||||
raise
|
||||
except Exception as e:
|
||||
self.stats["failed_executions"] += 1
|
||||
logger.error(f"处理流 {stream_id} 时发生错误: {e}")
|
||||
|
||||
@@ -55,7 +55,51 @@ class SingleStreamContextManager:
|
||||
bool: 是否成功添加
|
||||
"""
|
||||
try:
|
||||
# 直接操作上下文的消息列表
|
||||
# 使用MessageManager的内置缓存系统
|
||||
try:
|
||||
from .message_manager import message_manager
|
||||
|
||||
# 如果MessageManager正在运行,使用缓存系统
|
||||
if message_manager.is_running:
|
||||
# 先计算兴趣值(需要在缓存前计算)
|
||||
await self._calculate_message_interest(message)
|
||||
message.is_read = False
|
||||
|
||||
# 添加到缓存而不是直接添加到未读消息
|
||||
cache_success = message_manager.add_message_to_cache(self.stream_id, message)
|
||||
|
||||
if cache_success:
|
||||
# 自动检测和更新chat type
|
||||
self._detect_chat_type(message)
|
||||
|
||||
self.total_messages += 1
|
||||
self.last_access_time = time.time()
|
||||
|
||||
# 检查当前是否正在处理消息
|
||||
is_processing = message_manager.get_stream_processing_status(self.stream_id)
|
||||
|
||||
if not is_processing:
|
||||
# 如果当前没有在处理,立即刷新缓存到未读消息
|
||||
cached_messages = message_manager.flush_cached_messages(self.stream_id)
|
||||
for cached_msg in cached_messages:
|
||||
self.context.unread_messages.append(cached_msg)
|
||||
logger.debug(f"立即刷新缓存到未读消息: stream={self.stream_id}, 数量={len(cached_messages)}")
|
||||
else:
|
||||
logger.debug(f"消息已缓存,等待当前处理完成: stream={self.stream_id}")
|
||||
|
||||
# 启动流的循环任务(如果还未启动)
|
||||
asyncio.create_task(stream_loop_manager.start_stream_loop(self.stream_id))
|
||||
logger.debug(f"添加消息到缓存系统: {self.stream_id}")
|
||||
return True
|
||||
else:
|
||||
logger.warning(f"消息缓存系统添加失败,回退到直接添加: {self.stream_id}")
|
||||
|
||||
except ImportError:
|
||||
logger.debug("MessageManager不可用,使用直接添加模式")
|
||||
except Exception as e:
|
||||
logger.warning(f"消息缓存系统异常,回退到直接添加: {self.stream_id}, error={e}")
|
||||
|
||||
# 回退方案:直接添加到未读消息
|
||||
message.is_read = False
|
||||
self.context.unread_messages.append(message)
|
||||
|
||||
|
||||
@@ -364,9 +364,17 @@ class StreamLoopManager:
|
||||
logger.warning(f"Chatter管理器未设置: {stream_id}")
|
||||
return False
|
||||
|
||||
# 设置处理状态为正在处理
|
||||
self._set_stream_processing_status(stream_id, True)
|
||||
|
||||
try:
|
||||
start_time = time.time()
|
||||
|
||||
# 在处理开始前,先刷新缓存到未读消息
|
||||
cached_messages = await self._flush_cached_messages_to_unread(stream_id)
|
||||
if cached_messages:
|
||||
logger.info(f"处理开始前刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}")
|
||||
|
||||
# 直接调用chatter_manager处理流上下文
|
||||
task = asyncio.create_task(self.chatter_manager.process_stream_context(stream_id, context))
|
||||
self.chatter_manager.set_processing_task(stream_id, task)
|
||||
@@ -374,6 +382,11 @@ class StreamLoopManager:
|
||||
success = results.get("success", False)
|
||||
|
||||
if success:
|
||||
# 处理成功后,再次刷新缓存中可能的新消息
|
||||
additional_messages = await self._flush_cached_messages_to_unread(stream_id)
|
||||
if additional_messages:
|
||||
logger.info(f"处理完成后刷新新消息: stream={stream_id}, 数量={len(additional_messages)}")
|
||||
|
||||
asyncio.create_task(self._refresh_focus_energy(stream_id))
|
||||
process_time = time.time() - start_time
|
||||
logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)")
|
||||
@@ -385,6 +398,57 @@ class StreamLoopManager:
|
||||
except Exception as e:
|
||||
logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True)
|
||||
return False
|
||||
finally:
|
||||
# 无论成功或失败,都要设置处理状态为未处理
|
||||
self._set_stream_processing_status(stream_id, False)
|
||||
|
||||
def _set_stream_processing_status(self, stream_id: str, is_processing: bool) -> None:
|
||||
"""设置流的处理状态"""
|
||||
try:
|
||||
from .message_manager import message_manager
|
||||
|
||||
if message_manager.is_running:
|
||||
message_manager.set_stream_processing_status(stream_id, is_processing)
|
||||
logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}")
|
||||
|
||||
except ImportError:
|
||||
logger.debug("MessageManager不可用,跳过状态设置")
|
||||
except Exception as e:
|
||||
logger.warning(f"设置流处理状态失败: stream={stream_id}, error={e}")
|
||||
|
||||
async def _flush_cached_messages_to_unread(self, stream_id: str) -> list:
|
||||
"""将缓存消息刷新到未读消息列表"""
|
||||
try:
|
||||
from .message_manager import message_manager
|
||||
|
||||
if message_manager.is_running and message_manager.has_cached_messages(stream_id):
|
||||
# 获取缓存消息
|
||||
cached_messages = message_manager.flush_cached_messages(stream_id)
|
||||
|
||||
if cached_messages:
|
||||
# 获取聊天流并添加到未读消息
|
||||
from src.plugin_system.apis.chat_api import get_chat_manager
|
||||
|
||||
chat_manager = get_chat_manager()
|
||||
chat_stream = await chat_manager.get_stream(stream_id)
|
||||
|
||||
if chat_stream:
|
||||
for message in cached_messages:
|
||||
chat_stream.context_manager.context.unread_messages.append(message)
|
||||
logger.debug(f"刷新缓存消息到未读列表: stream={stream_id}, 数量={len(cached_messages)}")
|
||||
else:
|
||||
logger.warning(f"无法找到聊天流: {stream_id}")
|
||||
|
||||
return cached_messages
|
||||
|
||||
return []
|
||||
|
||||
except ImportError:
|
||||
logger.debug("MessageManager不可用,跳过缓存刷新")
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.warning(f"刷新缓存消息失败: stream={stream_id}, error={e}")
|
||||
return []
|
||||
|
||||
async def _calculate_interval(self, stream_id: str, has_messages: bool) -> float:
|
||||
"""计算下次检查间隔
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
import asyncio
|
||||
import random
|
||||
import time
|
||||
from collections import defaultdict, deque
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from src.chat.chatter_manager import ChatterManager
|
||||
@@ -46,6 +47,14 @@ class MessageManager:
|
||||
self.sleep_manager = SleepManager()
|
||||
self.wakeup_manager = WakeUpManager(self.sleep_manager)
|
||||
|
||||
# 消息缓存系统 - 直接集成到消息管理器
|
||||
self.message_caches: Dict[str, deque] = defaultdict(deque) # 每个流的消息缓存
|
||||
self.stream_processing_status: Dict[str, bool] = defaultdict(bool) # 流的处理状态
|
||||
self.cache_stats = {
|
||||
"total_cached_messages": 0,
|
||||
"total_flushed_messages": 0,
|
||||
}
|
||||
|
||||
# 不再需要全局上下文管理器,直接通过 ChatManager 访问各个 ChatStream 的 context_manager
|
||||
|
||||
async def start(self):
|
||||
@@ -72,6 +81,9 @@ class MessageManager:
|
||||
except Exception as e:
|
||||
logger.error(f"启动流缓存管理器失败: {e}")
|
||||
|
||||
# 启动消息缓存系统(内置)
|
||||
logger.info("📦 消息缓存系统已启动")
|
||||
|
||||
# 启动自适应流管理器
|
||||
try:
|
||||
from src.chat.message_manager.adaptive_stream_manager import init_adaptive_stream_manager
|
||||
@@ -115,6 +127,11 @@ class MessageManager:
|
||||
except Exception as e:
|
||||
logger.error(f"停止流缓存管理器失败: {e}")
|
||||
|
||||
# 停止消息缓存系统(内置)
|
||||
self.message_caches.clear()
|
||||
self.stream_processing_status.clear()
|
||||
logger.info("📦 消息缓存系统已停止")
|
||||
|
||||
# 停止自适应流管理器
|
||||
try:
|
||||
from src.chat.message_manager.adaptive_stream_manager import shutdown_adaptive_stream_manager
|
||||
@@ -429,6 +446,115 @@ class MessageManager:
|
||||
except Exception as e:
|
||||
logger.error(f"清除流 {stream_id} 的未读消息时发生错误: {e}")
|
||||
|
||||
# ===== 消息缓存系统方法 =====
|
||||
|
||||
def add_message_to_cache(self, stream_id: str, message: DatabaseMessages) -> bool:
|
||||
"""添加消息到缓存
|
||||
|
||||
Args:
|
||||
stream_id: 流ID
|
||||
message: 消息对象
|
||||
|
||||
Returns:
|
||||
bool: 是否成功添加到缓存
|
||||
"""
|
||||
try:
|
||||
if not self.is_running:
|
||||
return False
|
||||
|
||||
self.message_caches[stream_id].append(message)
|
||||
self.cache_stats["total_cached_messages"] += 1
|
||||
|
||||
logger.debug(f"消息已添加到缓存: stream={stream_id}, content={message.processed_plain_text[:50]}...")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"添加消息到缓存失败: stream={stream_id}, error={e}")
|
||||
return False
|
||||
|
||||
def flush_cached_messages(self, stream_id: str) -> list[DatabaseMessages]:
|
||||
"""刷新缓存消息到未读消息列表
|
||||
|
||||
Args:
|
||||
stream_id: 流ID
|
||||
|
||||
Returns:
|
||||
List[DatabaseMessages]: 缓存的消息列表
|
||||
"""
|
||||
try:
|
||||
if stream_id not in self.message_caches:
|
||||
return []
|
||||
|
||||
cached_messages = list(self.message_caches[stream_id])
|
||||
self.message_caches[stream_id].clear()
|
||||
|
||||
self.cache_stats["total_flushed_messages"] += len(cached_messages)
|
||||
|
||||
logger.debug(f"刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}")
|
||||
return cached_messages
|
||||
except Exception as e:
|
||||
logger.error(f"刷新缓存消息失败: stream={stream_id}, error={e}")
|
||||
return []
|
||||
|
||||
def set_stream_processing_status(self, stream_id: str, is_processing: bool):
|
||||
"""设置流的处理状态
|
||||
|
||||
Args:
|
||||
stream_id: 流ID
|
||||
is_processing: 是否正在处理
|
||||
"""
|
||||
try:
|
||||
self.stream_processing_status[stream_id] = is_processing
|
||||
logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}")
|
||||
except Exception as e:
|
||||
logger.error(f"设置流处理状态失败: stream={stream_id}, error={e}")
|
||||
|
||||
def get_stream_processing_status(self, stream_id: str) -> bool:
|
||||
"""获取流的处理状态
|
||||
|
||||
Args:
|
||||
stream_id: 流ID
|
||||
|
||||
Returns:
|
||||
bool: 是否正在处理
|
||||
"""
|
||||
return self.stream_processing_status.get(stream_id, False)
|
||||
|
||||
def has_cached_messages(self, stream_id: str) -> bool:
|
||||
"""检查流是否有缓存消息
|
||||
|
||||
Args:
|
||||
stream_id: 流ID
|
||||
|
||||
Returns:
|
||||
bool: 是否有缓存消息
|
||||
"""
|
||||
return stream_id in self.message_caches and len(self.message_caches[stream_id]) > 0
|
||||
|
||||
def get_cached_messages_count(self, stream_id: str) -> int:
|
||||
"""获取流的缓存消息数量
|
||||
|
||||
Args:
|
||||
stream_id: 流ID
|
||||
|
||||
Returns:
|
||||
int: 缓存消息数量
|
||||
"""
|
||||
return len(self.message_caches.get(stream_id, []))
|
||||
|
||||
def get_cache_stats(self) -> dict[str, Any]:
|
||||
"""获取缓存统计信息
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: 缓存统计信息
|
||||
"""
|
||||
return {
|
||||
"total_cached_messages": self.cache_stats["total_cached_messages"],
|
||||
"total_flushed_messages": self.cache_stats["total_flushed_messages"],
|
||||
"active_caches": len(self.message_caches),
|
||||
"cached_streams": len([s for s in self.message_caches.keys() if self.message_caches[s]]),
|
||||
"processing_streams": len([s for s in self.stream_processing_status.keys() if self.stream_processing_status[s]]),
|
||||
}
|
||||
|
||||
|
||||
# 创建全局消息管理器实例
|
||||
message_manager = MessageManager()
|
||||
|
||||
Reference in New Issue
Block a user