diff --git a/src/chat/chatter_manager.py b/src/chat/chatter_manager.py index 3ef7479b4..ce6e83f10 100644 --- a/src/chat/chatter_manager.py +++ b/src/chat/chatter_manager.py @@ -110,7 +110,23 @@ class ChatterManager: self.stats["streams_processed"] += 1 try: result = await self.instances[stream_id].execute(context) - self.stats["successful_executions"] += 1 + + # 检查执行结果是否真正成功 + success = result.get("success", False) + + if success: + self.stats["successful_executions"] += 1 + + # 只有真正成功时才清空未读消息 + try: + from src.chat.message_manager.message_manager import message_manager + await message_manager.clear_stream_unread_messages(stream_id) + logger.debug(f"流 {stream_id} 处理成功,已清空未读消息") + except Exception as clear_e: + logger.error(f"清除流 {stream_id} 未读消息时发生错误: {clear_e}") + else: + self.stats["failed_executions"] += 1 + logger.warning(f"流 {stream_id} 处理失败,不清空未读消息") # 从 mood_manager 获取最新的 chat_stream 并同步回 StreamContext try: @@ -124,19 +140,14 @@ class ChatterManager: logger.error(f"同步 chat_stream 回 StreamContext 失败: {sync_e}") # 记录处理结果 - success = result.get("success", False) actions_count = result.get("actions_count", 0) logger.debug(f"流 {stream_id} 处理完成: 成功={success}, 动作数={actions_count}") - # 在处理完成后,清除该流的未读消息 - try: - from src.chat.message_manager.message_manager import message_manager - - await message_manager.clear_stream_unread_messages(stream_id) - except Exception as clear_e: - logger.error(f"清除流 {stream_id} 未读消息时发生错误: {clear_e}") - return result + except asyncio.CancelledError: + self.stats["failed_executions"] += 1 + logger.info(f"流 {stream_id} 处理被取消,不清空未读消息") + raise except Exception as e: self.stats["failed_executions"] += 1 logger.error(f"处理流 {stream_id} 时发生错误: {e}") diff --git a/src/chat/message_manager/context_manager.py b/src/chat/message_manager/context_manager.py index ea37ad0be..d39b599fc 100644 --- a/src/chat/message_manager/context_manager.py +++ b/src/chat/message_manager/context_manager.py @@ -55,7 +55,51 @@ class SingleStreamContextManager: bool: 是否成功添加 """ try: - # 直接操作上下文的消息列表 + # 使用MessageManager的内置缓存系统 + try: + from .message_manager import message_manager + + # 如果MessageManager正在运行,使用缓存系统 + if message_manager.is_running: + # 先计算兴趣值(需要在缓存前计算) + await self._calculate_message_interest(message) + message.is_read = False + + # 添加到缓存而不是直接添加到未读消息 + cache_success = message_manager.add_message_to_cache(self.stream_id, message) + + if cache_success: + # 自动检测和更新chat type + self._detect_chat_type(message) + + self.total_messages += 1 + self.last_access_time = time.time() + + # 检查当前是否正在处理消息 + is_processing = message_manager.get_stream_processing_status(self.stream_id) + + if not is_processing: + # 如果当前没有在处理,立即刷新缓存到未读消息 + cached_messages = message_manager.flush_cached_messages(self.stream_id) + for cached_msg in cached_messages: + self.context.unread_messages.append(cached_msg) + logger.debug(f"立即刷新缓存到未读消息: stream={self.stream_id}, 数量={len(cached_messages)}") + else: + logger.debug(f"消息已缓存,等待当前处理完成: stream={self.stream_id}") + + # 启动流的循环任务(如果还未启动) + asyncio.create_task(stream_loop_manager.start_stream_loop(self.stream_id)) + logger.debug(f"添加消息到缓存系统: {self.stream_id}") + return True + else: + logger.warning(f"消息缓存系统添加失败,回退到直接添加: {self.stream_id}") + + except ImportError: + logger.debug("MessageManager不可用,使用直接添加模式") + except Exception as e: + logger.warning(f"消息缓存系统异常,回退到直接添加: {self.stream_id}, error={e}") + + # 回退方案:直接添加到未读消息 message.is_read = False self.context.unread_messages.append(message) diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index d71dd9370..0eac8bb44 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -364,9 +364,17 @@ class StreamLoopManager: logger.warning(f"Chatter管理器未设置: {stream_id}") return False + # 设置处理状态为正在处理 + self._set_stream_processing_status(stream_id, True) + try: start_time = time.time() + # 在处理开始前,先刷新缓存到未读消息 + cached_messages = await self._flush_cached_messages_to_unread(stream_id) + if cached_messages: + logger.info(f"处理开始前刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}") + # 直接调用chatter_manager处理流上下文 task = asyncio.create_task(self.chatter_manager.process_stream_context(stream_id, context)) self.chatter_manager.set_processing_task(stream_id, task) @@ -374,6 +382,11 @@ class StreamLoopManager: success = results.get("success", False) if success: + # 处理成功后,再次刷新缓存中可能的新消息 + additional_messages = await self._flush_cached_messages_to_unread(stream_id) + if additional_messages: + logger.info(f"处理完成后刷新新消息: stream={stream_id}, 数量={len(additional_messages)}") + asyncio.create_task(self._refresh_focus_energy(stream_id)) process_time = time.time() - start_time logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)") @@ -385,6 +398,57 @@ class StreamLoopManager: except Exception as e: logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True) return False + finally: + # 无论成功或失败,都要设置处理状态为未处理 + self._set_stream_processing_status(stream_id, False) + + def _set_stream_processing_status(self, stream_id: str, is_processing: bool) -> None: + """设置流的处理状态""" + try: + from .message_manager import message_manager + + if message_manager.is_running: + message_manager.set_stream_processing_status(stream_id, is_processing) + logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}") + + except ImportError: + logger.debug("MessageManager不可用,跳过状态设置") + except Exception as e: + logger.warning(f"设置流处理状态失败: stream={stream_id}, error={e}") + + async def _flush_cached_messages_to_unread(self, stream_id: str) -> list: + """将缓存消息刷新到未读消息列表""" + try: + from .message_manager import message_manager + + if message_manager.is_running and message_manager.has_cached_messages(stream_id): + # 获取缓存消息 + cached_messages = message_manager.flush_cached_messages(stream_id) + + if cached_messages: + # 获取聊天流并添加到未读消息 + from src.plugin_system.apis.chat_api import get_chat_manager + + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(stream_id) + + if chat_stream: + for message in cached_messages: + chat_stream.context_manager.context.unread_messages.append(message) + logger.debug(f"刷新缓存消息到未读列表: stream={stream_id}, 数量={len(cached_messages)}") + else: + logger.warning(f"无法找到聊天流: {stream_id}") + + return cached_messages + + return [] + + except ImportError: + logger.debug("MessageManager不可用,跳过缓存刷新") + return [] + except Exception as e: + logger.warning(f"刷新缓存消息失败: stream={stream_id}, error={e}") + return [] async def _calculate_interval(self, stream_id: str, has_messages: bool) -> float: """计算下次检查间隔 diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index b8f2ab643..706431138 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -6,6 +6,7 @@ import asyncio import random import time +from collections import defaultdict, deque from typing import TYPE_CHECKING, Any from src.chat.chatter_manager import ChatterManager @@ -46,6 +47,14 @@ class MessageManager: self.sleep_manager = SleepManager() self.wakeup_manager = WakeUpManager(self.sleep_manager) + # 消息缓存系统 - 直接集成到消息管理器 + self.message_caches: Dict[str, deque] = defaultdict(deque) # 每个流的消息缓存 + self.stream_processing_status: Dict[str, bool] = defaultdict(bool) # 流的处理状态 + self.cache_stats = { + "total_cached_messages": 0, + "total_flushed_messages": 0, + } + # 不再需要全局上下文管理器,直接通过 ChatManager 访问各个 ChatStream 的 context_manager async def start(self): @@ -72,6 +81,9 @@ class MessageManager: except Exception as e: logger.error(f"启动流缓存管理器失败: {e}") + # 启动消息缓存系统(内置) + logger.info("📦 消息缓存系统已启动") + # 启动自适应流管理器 try: from src.chat.message_manager.adaptive_stream_manager import init_adaptive_stream_manager @@ -115,6 +127,11 @@ class MessageManager: except Exception as e: logger.error(f"停止流缓存管理器失败: {e}") + # 停止消息缓存系统(内置) + self.message_caches.clear() + self.stream_processing_status.clear() + logger.info("📦 消息缓存系统已停止") + # 停止自适应流管理器 try: from src.chat.message_manager.adaptive_stream_manager import shutdown_adaptive_stream_manager @@ -429,6 +446,115 @@ class MessageManager: except Exception as e: logger.error(f"清除流 {stream_id} 的未读消息时发生错误: {e}") + # ===== 消息缓存系统方法 ===== + + def add_message_to_cache(self, stream_id: str, message: DatabaseMessages) -> bool: + """添加消息到缓存 + + Args: + stream_id: 流ID + message: 消息对象 + + Returns: + bool: 是否成功添加到缓存 + """ + try: + if not self.is_running: + return False + + self.message_caches[stream_id].append(message) + self.cache_stats["total_cached_messages"] += 1 + + logger.debug(f"消息已添加到缓存: stream={stream_id}, content={message.processed_plain_text[:50]}...") + return True + except Exception as e: + logger.error(f"添加消息到缓存失败: stream={stream_id}, error={e}") + return False + + def flush_cached_messages(self, stream_id: str) -> list[DatabaseMessages]: + """刷新缓存消息到未读消息列表 + + Args: + stream_id: 流ID + + Returns: + List[DatabaseMessages]: 缓存的消息列表 + """ + try: + if stream_id not in self.message_caches: + return [] + + cached_messages = list(self.message_caches[stream_id]) + self.message_caches[stream_id].clear() + + self.cache_stats["total_flushed_messages"] += len(cached_messages) + + logger.debug(f"刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}") + return cached_messages + except Exception as e: + logger.error(f"刷新缓存消息失败: stream={stream_id}, error={e}") + return [] + + def set_stream_processing_status(self, stream_id: str, is_processing: bool): + """设置流的处理状态 + + Args: + stream_id: 流ID + is_processing: 是否正在处理 + """ + try: + self.stream_processing_status[stream_id] = is_processing + logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}") + except Exception as e: + logger.error(f"设置流处理状态失败: stream={stream_id}, error={e}") + + def get_stream_processing_status(self, stream_id: str) -> bool: + """获取流的处理状态 + + Args: + stream_id: 流ID + + Returns: + bool: 是否正在处理 + """ + return self.stream_processing_status.get(stream_id, False) + + def has_cached_messages(self, stream_id: str) -> bool: + """检查流是否有缓存消息 + + Args: + stream_id: 流ID + + Returns: + bool: 是否有缓存消息 + """ + return stream_id in self.message_caches and len(self.message_caches[stream_id]) > 0 + + def get_cached_messages_count(self, stream_id: str) -> int: + """获取流的缓存消息数量 + + Args: + stream_id: 流ID + + Returns: + int: 缓存消息数量 + """ + return len(self.message_caches.get(stream_id, [])) + + def get_cache_stats(self) -> dict[str, Any]: + """获取缓存统计信息 + + Returns: + Dict[str, Any]: 缓存统计信息 + """ + return { + "total_cached_messages": self.cache_stats["total_cached_messages"], + "total_flushed_messages": self.cache_stats["total_flushed_messages"], + "active_caches": len(self.message_caches), + "cached_streams": len([s for s in self.message_caches.keys() if self.message_caches[s]]), + "processing_streams": len([s for s in self.stream_processing_status.keys() if self.stream_processing_status[s]]), + } + # 创建全局消息管理器实例 message_manager = MessageManager()