feat(context_manager): 添加消息处理日志,优化消息标记为已读的逻辑
This commit is contained in:
@@ -82,6 +82,11 @@ class SingleStreamContextManager:
|
||||
|
||||
self.total_messages += 1
|
||||
self.last_access_time = time.time()
|
||||
|
||||
# 记录消息添加日志
|
||||
msg_preview = message.processed_plain_text[:30] if message.processed_plain_text else "(无内容)"
|
||||
msg_id_str = str(message.message_id)[:8] if message.message_id else "unknown"
|
||||
logger.info(f"➕ [添加消息] {msg_id_str}: {msg_preview}..., 当前未读数: {len(self.context.unread_messages)}, 历史数: {len(self.context.history_messages)}")
|
||||
|
||||
# 如果使用了缓存系统,输出调试信息
|
||||
if cache_enabled and self.context.is_cache_enabled:
|
||||
@@ -112,9 +117,9 @@ class SingleStreamContextManager:
|
||||
bool: 是否成功更新
|
||||
"""
|
||||
try:
|
||||
# 直接在未读消息中查找并更新
|
||||
# 直接在未读消息中查找并更新(统一转字符串比较)
|
||||
for message in self.context.unread_messages:
|
||||
if message.message_id == message_id:
|
||||
if str(message.message_id) == str(message_id):
|
||||
if "interest_value" in updates:
|
||||
message.interest_value = updates["interest_value"]
|
||||
if "actions" in updates:
|
||||
@@ -123,9 +128,9 @@ class SingleStreamContextManager:
|
||||
message.should_reply = updates["should_reply"]
|
||||
break
|
||||
|
||||
# 在历史消息中查找并更新
|
||||
# 在历史消息中查找并更新(统一转字符串比较)
|
||||
for message in self.context.history_messages:
|
||||
if message.message_id == message_id:
|
||||
if str(message.message_id) == str(message_id):
|
||||
if "interest_value" in updates:
|
||||
message.interest_value = updates["interest_value"]
|
||||
if "actions" in updates:
|
||||
@@ -189,14 +194,20 @@ class SingleStreamContextManager:
|
||||
return False
|
||||
|
||||
marked_count = 0
|
||||
failed_ids = []
|
||||
for message_id in message_ids:
|
||||
try:
|
||||
self.context.mark_message_as_read(message_id)
|
||||
marked_count += 1
|
||||
except Exception as e:
|
||||
failed_ids.append(str(message_id)[:8])
|
||||
logger.warning(f"标记消息已读失败 {message_id}: {e}")
|
||||
|
||||
logger.debug(f"标记消息为已读: {self.stream_id} ({marked_count}/{len(message_ids)}条)")
|
||||
if marked_count > 0:
|
||||
logger.info(f"✅ [批量标记] stream={self.stream_id[:8]}, 成功标记 {marked_count}/{len(message_ids)} 条消息为已读")
|
||||
if failed_ids:
|
||||
logger.warning(f"⚠️ [批量标记] stream={self.stream_id[:8]}, {len(failed_ids)} 条消息标记失败: {failed_ids[:5]}")
|
||||
|
||||
return marked_count > 0
|
||||
|
||||
except Exception as e:
|
||||
@@ -321,11 +332,11 @@ class SingleStreamContextManager:
|
||||
async def _initialize_history_from_db(self):
|
||||
"""从数据库初始化历史消息到context中"""
|
||||
if self._history_initialized:
|
||||
logger.debug(f"历史消息已初始化,跳过: {self.stream_id}")
|
||||
logger.debug(f"历史消息已初始化,跳过: {self.stream_id}, 当前历史消息数: {len(self.context.history_messages)}")
|
||||
return
|
||||
|
||||
# 立即设置标志,防止并发重复加载
|
||||
logger.debug(f"设置历史初始化标志: {self.stream_id}")
|
||||
logger.info(f"🔄 [历史加载] 开始从数据库加载历史消息: {self.stream_id}")
|
||||
self._history_initialized = True
|
||||
|
||||
try:
|
||||
@@ -341,7 +352,9 @@ class SingleStreamContextManager:
|
||||
)
|
||||
|
||||
if db_messages:
|
||||
logger.info(f"📥 [历史加载] 从数据库获取到 {len(db_messages)} 条消息")
|
||||
# 将数据库消息转换为 DatabaseMessages 对象并添加到历史
|
||||
loaded_count = 0
|
||||
for msg_dict in db_messages:
|
||||
try:
|
||||
# 使用 ** 解包字典作为关键字参数
|
||||
@@ -352,12 +365,13 @@ class SingleStreamContextManager:
|
||||
|
||||
# 添加到历史消息
|
||||
self.context.history_messages.append(db_msg)
|
||||
loaded_count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"转换历史消息失败 (message_id={msg_dict.get('message_id', 'unknown')}): {e}")
|
||||
continue
|
||||
|
||||
logger.debug(f"成功从数据库加载 {len(self.context.history_messages)} 条历史消息到内存: {self.stream_id}")
|
||||
logger.info(f"✅ [历史加载] 成功加载 {loaded_count} 条历史消息到内存: {self.stream_id}")
|
||||
else:
|
||||
logger.debug(f"没有历史消息需要加载: {self.stream_id}")
|
||||
|
||||
|
||||
@@ -414,9 +414,13 @@ class MessageManager:
|
||||
# 获取未读消息
|
||||
unread_messages = chat_stream.context_manager.get_unread_messages()
|
||||
if not unread_messages:
|
||||
logger.info(f"🧹 [清除未读] stream={stream_id[:8]}, 无未读消息需要清除")
|
||||
return
|
||||
|
||||
logger.warning(f"正在清除 {len(unread_messages)} 条未读消息")
|
||||
# 记录详细信息
|
||||
msg_previews = [f"{str(msg.message_id)[:8] if msg.message_id else 'unknown'}:{msg.processed_plain_text[:20] if msg.processed_plain_text else '(空)'}"
|
||||
for msg in unread_messages[:3]] # 只显示前3条
|
||||
logger.info(f"🧹 [清除未读] stream={stream_id[:8]}, 开始清除 {len(unread_messages)} 条未读消息, 示例: {msg_previews}")
|
||||
|
||||
# 将所有未读消息标记为已读
|
||||
message_ids = [msg.message_id for msg in unread_messages]
|
||||
@@ -424,9 +428,9 @@ class MessageManager:
|
||||
|
||||
if success:
|
||||
self.stats.total_processed_messages += len(unread_messages)
|
||||
logger.debug(f"强制清除 {len(unread_messages)} 条消息,标记为已读")
|
||||
logger.info(f"✅ [清除未读] stream={stream_id[:8]}, 成功清除并标记 {len(unread_messages)} 条消息为已读")
|
||||
else:
|
||||
logger.error("标记未读消息为已读失败")
|
||||
logger.error(f"❌ [清除未读] stream={stream_id[:8]}, 标记未读消息为已读失败")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"清除未读消息时发生错误: {e}")
|
||||
|
||||
@@ -85,26 +85,41 @@ class StreamContext(BaseDataModel):
|
||||
message_id: 消息ID
|
||||
action: 要添加的动作名称
|
||||
"""
|
||||
# 在未读消息中查找并更新
|
||||
# 在未读消息中查找并更新(统一转字符串比较)
|
||||
for message in self.unread_messages:
|
||||
if message.message_id == message_id:
|
||||
if str(message.message_id) == str(message_id):
|
||||
message.add_action(action)
|
||||
break
|
||||
|
||||
# 在历史消息中查找并更新
|
||||
# 在历史消息中查找并更新(统一转字符串比较)
|
||||
for message in self.history_messages:
|
||||
if message.message_id == message_id:
|
||||
if str(message.message_id) == str(message_id):
|
||||
message.add_action(action)
|
||||
break
|
||||
|
||||
def mark_message_as_read(self, message_id: str):
|
||||
"""标记消息为已读"""
|
||||
from src.common.logger import get_logger
|
||||
logger = get_logger("StreamContext")
|
||||
|
||||
# 先找到要标记的消息(处理 int/str 类型不匹配问题)
|
||||
message_to_mark = None
|
||||
for msg in self.unread_messages:
|
||||
if msg.message_id == message_id:
|
||||
msg.is_read = True
|
||||
self.history_messages.append(msg)
|
||||
self.unread_messages.remove(msg)
|
||||
# 统一转换为字符串比较,避免 int vs str 导致的匹配失败
|
||||
if str(msg.message_id) == str(message_id):
|
||||
message_to_mark = msg
|
||||
break
|
||||
|
||||
# 然后移动到历史消息
|
||||
if message_to_mark:
|
||||
message_to_mark.is_read = True
|
||||
self.history_messages.append(message_to_mark)
|
||||
self.unread_messages.remove(message_to_mark)
|
||||
msg_id_str = str(message_id)[:8] if message_id else "unknown"
|
||||
logger.info(f"📌 [标记已读] 消息 {msg_id_str} 已移至历史, 当前历史数: {len(self.history_messages)}, 未读数: {len(self.unread_messages)}")
|
||||
else:
|
||||
msg_id_str = str(message_id)[:8] if message_id else "unknown"
|
||||
logger.warning(f"⚠️ [标记已读] 未找到消息 {msg_id_str} 在未读列表中, 当前未读消息ID列表: {[str(m.message_id)[:8] for m in self.unread_messages[:5]]}")
|
||||
|
||||
def get_unread_messages(self) -> list["DatabaseMessages"]:
|
||||
"""获取未读消息"""
|
||||
|
||||
@@ -362,6 +362,9 @@ class ChatterPlanFilter:
|
||||
return "最近没有聊天内容。", "没有未读消息。", []
|
||||
|
||||
stream_context = chat_stream.context_manager
|
||||
|
||||
# 🔥 确保历史消息已从数据库加载
|
||||
await stream_context.ensure_history_initialized()
|
||||
|
||||
# 获取真正的已读和未读消息
|
||||
read_messages = stream_context.context.history_messages # 已读消息存储在history_messages中
|
||||
|
||||
Reference in New Issue
Block a user