diff --git a/src/chat/message_manager/context_manager.py b/src/chat/message_manager/context_manager.py index c9ad3a674..e0be46872 100644 --- a/src/chat/message_manager/context_manager.py +++ b/src/chat/message_manager/context_manager.py @@ -82,6 +82,11 @@ class SingleStreamContextManager: self.total_messages += 1 self.last_access_time = time.time() + + # 记录消息添加日志 + msg_preview = message.processed_plain_text[:30] if message.processed_plain_text else "(无内容)" + msg_id_str = str(message.message_id)[:8] if message.message_id else "unknown" + logger.info(f"➕ [添加消息] {msg_id_str}: {msg_preview}..., 当前未读数: {len(self.context.unread_messages)}, 历史数: {len(self.context.history_messages)}") # 如果使用了缓存系统,输出调试信息 if cache_enabled and self.context.is_cache_enabled: @@ -112,9 +117,9 @@ class SingleStreamContextManager: bool: 是否成功更新 """ try: - # 直接在未读消息中查找并更新 + # 直接在未读消息中查找并更新(统一转字符串比较) for message in self.context.unread_messages: - if message.message_id == message_id: + if str(message.message_id) == str(message_id): if "interest_value" in updates: message.interest_value = updates["interest_value"] if "actions" in updates: @@ -123,9 +128,9 @@ class SingleStreamContextManager: message.should_reply = updates["should_reply"] break - # 在历史消息中查找并更新 + # 在历史消息中查找并更新(统一转字符串比较) for message in self.context.history_messages: - if message.message_id == message_id: + if str(message.message_id) == str(message_id): if "interest_value" in updates: message.interest_value = updates["interest_value"] if "actions" in updates: @@ -189,14 +194,20 @@ class SingleStreamContextManager: return False marked_count = 0 + failed_ids = [] for message_id in message_ids: try: self.context.mark_message_as_read(message_id) marked_count += 1 except Exception as e: + failed_ids.append(str(message_id)[:8]) logger.warning(f"标记消息已读失败 {message_id}: {e}") - logger.debug(f"标记消息为已读: {self.stream_id} ({marked_count}/{len(message_ids)}条)") + if marked_count > 0: + logger.info(f"✅ [批量标记] stream={self.stream_id[:8]}, 成功标记 {marked_count}/{len(message_ids)} 条消息为已读") + if failed_ids: + logger.warning(f"⚠️ [批量标记] stream={self.stream_id[:8]}, {len(failed_ids)} 条消息标记失败: {failed_ids[:5]}") + return marked_count > 0 except Exception as e: @@ -321,11 +332,11 @@ class SingleStreamContextManager: async def _initialize_history_from_db(self): """从数据库初始化历史消息到context中""" if self._history_initialized: - logger.debug(f"历史消息已初始化,跳过: {self.stream_id}") + logger.debug(f"历史消息已初始化,跳过: {self.stream_id}, 当前历史消息数: {len(self.context.history_messages)}") return # 立即设置标志,防止并发重复加载 - logger.debug(f"设置历史初始化标志: {self.stream_id}") + logger.info(f"🔄 [历史加载] 开始从数据库加载历史消息: {self.stream_id}") self._history_initialized = True try: @@ -341,7 +352,9 @@ class SingleStreamContextManager: ) if db_messages: + logger.info(f"📥 [历史加载] 从数据库获取到 {len(db_messages)} 条消息") # 将数据库消息转换为 DatabaseMessages 对象并添加到历史 + loaded_count = 0 for msg_dict in db_messages: try: # 使用 ** 解包字典作为关键字参数 @@ -352,12 +365,13 @@ class SingleStreamContextManager: # 添加到历史消息 self.context.history_messages.append(db_msg) + loaded_count += 1 except Exception as e: logger.warning(f"转换历史消息失败 (message_id={msg_dict.get('message_id', 'unknown')}): {e}") continue - logger.debug(f"成功从数据库加载 {len(self.context.history_messages)} 条历史消息到内存: {self.stream_id}") + logger.info(f"✅ [历史加载] 成功加载 {loaded_count} 条历史消息到内存: {self.stream_id}") else: logger.debug(f"没有历史消息需要加载: {self.stream_id}") diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 6b1b15659..12f22468a 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -414,9 +414,13 @@ class MessageManager: # 获取未读消息 unread_messages = chat_stream.context_manager.get_unread_messages() if not unread_messages: + logger.info(f"🧹 [清除未读] stream={stream_id[:8]}, 无未读消息需要清除") return - logger.warning(f"正在清除 {len(unread_messages)} 条未读消息") + # 记录详细信息 + msg_previews = [f"{str(msg.message_id)[:8] if msg.message_id else 'unknown'}:{msg.processed_plain_text[:20] if msg.processed_plain_text else '(空)'}" + for msg in unread_messages[:3]] # 只显示前3条 + logger.info(f"🧹 [清除未读] stream={stream_id[:8]}, 开始清除 {len(unread_messages)} 条未读消息, 示例: {msg_previews}") # 将所有未读消息标记为已读 message_ids = [msg.message_id for msg in unread_messages] @@ -424,9 +428,9 @@ class MessageManager: if success: self.stats.total_processed_messages += len(unread_messages) - logger.debug(f"强制清除 {len(unread_messages)} 条消息,标记为已读") + logger.info(f"✅ [清除未读] stream={stream_id[:8]}, 成功清除并标记 {len(unread_messages)} 条消息为已读") else: - logger.error("标记未读消息为已读失败") + logger.error(f"❌ [清除未读] stream={stream_id[:8]}, 标记未读消息为已读失败") except Exception as e: logger.error(f"清除未读消息时发生错误: {e}") diff --git a/src/common/data_models/message_manager_data_model.py b/src/common/data_models/message_manager_data_model.py index 801dc881b..541b39bea 100644 --- a/src/common/data_models/message_manager_data_model.py +++ b/src/common/data_models/message_manager_data_model.py @@ -85,26 +85,41 @@ class StreamContext(BaseDataModel): message_id: 消息ID action: 要添加的动作名称 """ - # 在未读消息中查找并更新 + # 在未读消息中查找并更新(统一转字符串比较) for message in self.unread_messages: - if message.message_id == message_id: + if str(message.message_id) == str(message_id): message.add_action(action) break - # 在历史消息中查找并更新 + # 在历史消息中查找并更新(统一转字符串比较) for message in self.history_messages: - if message.message_id == message_id: + if str(message.message_id) == str(message_id): message.add_action(action) break def mark_message_as_read(self, message_id: str): """标记消息为已读""" + from src.common.logger import get_logger + logger = get_logger("StreamContext") + + # 先找到要标记的消息(处理 int/str 类型不匹配问题) + message_to_mark = None for msg in self.unread_messages: - if msg.message_id == message_id: - msg.is_read = True - self.history_messages.append(msg) - self.unread_messages.remove(msg) + # 统一转换为字符串比较,避免 int vs str 导致的匹配失败 + if str(msg.message_id) == str(message_id): + message_to_mark = msg break + + # 然后移动到历史消息 + if message_to_mark: + message_to_mark.is_read = True + self.history_messages.append(message_to_mark) + self.unread_messages.remove(message_to_mark) + msg_id_str = str(message_id)[:8] if message_id else "unknown" + logger.info(f"📌 [标记已读] 消息 {msg_id_str} 已移至历史, 当前历史数: {len(self.history_messages)}, 未读数: {len(self.unread_messages)}") + else: + msg_id_str = str(message_id)[:8] if message_id else "unknown" + logger.warning(f"⚠️ [标记已读] 未找到消息 {msg_id_str} 在未读列表中, 当前未读消息ID列表: {[str(m.message_id)[:8] for m in self.unread_messages[:5]]}") def get_unread_messages(self) -> list["DatabaseMessages"]: """获取未读消息""" diff --git a/src/plugins/built_in/affinity_flow_chatter/planner/plan_filter.py b/src/plugins/built_in/affinity_flow_chatter/planner/plan_filter.py index d430e909e..231f11d8a 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner/plan_filter.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner/plan_filter.py @@ -362,6 +362,9 @@ class ChatterPlanFilter: return "最近没有聊天内容。", "没有未读消息。", [] stream_context = chat_stream.context_manager + + # 🔥 确保历史消息已从数据库加载 + await stream_context.ensure_history_initialized() # 获取真正的已读和未读消息 read_messages = stream_context.context.history_messages # 已读消息存储在history_messages中