This commit is contained in:
tt-P607
2025-10-07 16:47:50 +08:00
12 changed files with 490 additions and 267 deletions

View File

@@ -5,7 +5,7 @@
"""
import datetime
from typing import Any, Optional, TypedDict, Literal, Union, Callable, TypeVar, cast
from typing import Any, Optional, TypeVar, cast
from sqlalchemy import select, delete
@@ -47,29 +47,26 @@ class AntiInjectionStatistics:
"""当前会话开始时间"""
@staticmethod
async def get_or_create_stats() -> Optional[AntiInjectionStats]: # type: ignore[name-defined]
async def get_or_create_stats() -> AntiInjectionStats:
"""获取或创建统计记录
Returns:
AntiInjectionStats | None: 成功返回模型实例,否则 None
"""
try:
async with get_db_session() as session:
async with get_db_session() as session:
# 获取最新的统计记录,如果没有则创建
stats = (
stats = (
(await session.execute(select(AntiInjectionStats).order_by(AntiInjectionStats.id.desc())))
.scalars()
.first()
)
if not stats:
stats = AntiInjectionStats()
session.add(stats)
await session.commit()
await session.refresh(stats)
return stats
except Exception as e:
logger.error(f"获取统计记录失败: {e}")
return None
if not stats:
stats = AntiInjectionStats()
session.add(stats)
await session.commit()
await session.refresh(stats)
return stats
@staticmethod
async def update_stats(**kwargs: Any) -> None:
@@ -97,7 +94,7 @@ class AntiInjectionStatistics:
if key == "processing_time_delta":
# 处理时间累加 - 确保不为 None
delta = float(value)
stats.processing_time_total = _add_optional(stats.processing_time_total, delta) # type: ignore[attr-defined]
stats.processing_time_total = _add_optional(stats.processing_time_total, delta)
continue
elif key == "last_processing_time":
# 直接设置最后处理时间
@@ -146,7 +143,7 @@ class AntiInjectionStatistics:
# 计算派生统计信息 - 处理 None 值
total_messages = stats.total_messages or 0 # type: ignore[attr-defined]
total_messages = stats.total_messages or 0
detected_injections = stats.detected_injections or 0 # type: ignore[attr-defined]
processing_time_total = stats.processing_time_total or 0.0 # type: ignore[attr-defined]

View File

@@ -110,7 +110,23 @@ class ChatterManager:
self.stats["streams_processed"] += 1
try:
result = await self.instances[stream_id].execute(context)
self.stats["successful_executions"] += 1
# 检查执行结果是否真正成功
success = result.get("success", False)
if success:
self.stats["successful_executions"] += 1
# 只有真正成功时才清空未读消息
try:
from src.chat.message_manager.message_manager import message_manager
await message_manager.clear_stream_unread_messages(stream_id)
logger.debug(f"{stream_id} 处理成功,已清空未读消息")
except Exception as clear_e:
logger.error(f"清除流 {stream_id} 未读消息时发生错误: {clear_e}")
else:
self.stats["failed_executions"] += 1
logger.warning(f"{stream_id} 处理失败,不清空未读消息")
# 从 mood_manager 获取最新的 chat_stream 并同步回 StreamContext
try:
@@ -124,19 +140,14 @@ class ChatterManager:
logger.error(f"同步 chat_stream 回 StreamContext 失败: {sync_e}")
# 记录处理结果
success = result.get("success", False)
actions_count = result.get("actions_count", 0)
logger.debug(f"{stream_id} 处理完成: 成功={success}, 动作数={actions_count}")
# 在处理完成后,清除该流的未读消息
try:
from src.chat.message_manager.message_manager import message_manager
await message_manager.clear_stream_unread_messages(stream_id)
except Exception as clear_e:
logger.error(f"清除流 {stream_id} 未读消息时发生错误: {clear_e}")
return result
except asyncio.CancelledError:
self.stats["failed_executions"] += 1
logger.info(f"{stream_id} 处理被取消,不清空未读消息")
raise
except Exception as e:
self.stats["failed_executions"] += 1
logger.error(f"处理流 {stream_id} 时发生错误: {e}")

View File

@@ -55,7 +55,51 @@ class SingleStreamContextManager:
bool: 是否成功添加
"""
try:
# 直接操作上下文的消息列表
# 使用MessageManager的内置缓存系统
try:
from .message_manager import message_manager
# 如果MessageManager正在运行使用缓存系统
if message_manager.is_running:
# 先计算兴趣值(需要在缓存前计算)
await self._calculate_message_interest(message)
message.is_read = False
# 添加到缓存而不是直接添加到未读消息
cache_success = message_manager.add_message_to_cache(self.stream_id, message)
if cache_success:
# 自动检测和更新chat type
self._detect_chat_type(message)
self.total_messages += 1
self.last_access_time = time.time()
# 检查当前是否正在处理消息
is_processing = message_manager.get_stream_processing_status(self.stream_id)
if not is_processing:
# 如果当前没有在处理,立即刷新缓存到未读消息
cached_messages = message_manager.flush_cached_messages(self.stream_id)
for cached_msg in cached_messages:
self.context.unread_messages.append(cached_msg)
logger.debug(f"立即刷新缓存到未读消息: stream={self.stream_id}, 数量={len(cached_messages)}")
else:
logger.debug(f"消息已缓存,等待当前处理完成: stream={self.stream_id}")
# 启动流的循环任务(如果还未启动)
asyncio.create_task(stream_loop_manager.start_stream_loop(self.stream_id))
logger.debug(f"添加消息到缓存系统: {self.stream_id}")
return True
else:
logger.warning(f"消息缓存系统添加失败,回退到直接添加: {self.stream_id}")
except ImportError:
logger.debug("MessageManager不可用使用直接添加模式")
except Exception as e:
logger.warning(f"消息缓存系统异常,回退到直接添加: {self.stream_id}, error={e}")
# 回退方案:直接添加到未读消息
message.is_read = False
self.context.unread_messages.append(message)

View File

@@ -364,9 +364,17 @@ class StreamLoopManager:
logger.warning(f"Chatter管理器未设置: {stream_id}")
return False
# 设置处理状态为正在处理
self._set_stream_processing_status(stream_id, True)
try:
start_time = time.time()
# 在处理开始前,先刷新缓存到未读消息
cached_messages = await self._flush_cached_messages_to_unread(stream_id)
if cached_messages:
logger.info(f"处理开始前刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}")
# 直接调用chatter_manager处理流上下文
task = asyncio.create_task(self.chatter_manager.process_stream_context(stream_id, context))
self.chatter_manager.set_processing_task(stream_id, task)
@@ -374,6 +382,11 @@ class StreamLoopManager:
success = results.get("success", False)
if success:
# 处理成功后,再次刷新缓存中可能的新消息
additional_messages = await self._flush_cached_messages_to_unread(stream_id)
if additional_messages:
logger.info(f"处理完成后刷新新消息: stream={stream_id}, 数量={len(additional_messages)}")
asyncio.create_task(self._refresh_focus_energy(stream_id))
process_time = time.time() - start_time
logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)")
@@ -385,6 +398,57 @@ class StreamLoopManager:
except Exception as e:
logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True)
return False
finally:
# 无论成功或失败,都要设置处理状态为未处理
self._set_stream_processing_status(stream_id, False)
def _set_stream_processing_status(self, stream_id: str, is_processing: bool) -> None:
"""设置流的处理状态"""
try:
from .message_manager import message_manager
if message_manager.is_running:
message_manager.set_stream_processing_status(stream_id, is_processing)
logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}")
except ImportError:
logger.debug("MessageManager不可用跳过状态设置")
except Exception as e:
logger.warning(f"设置流处理状态失败: stream={stream_id}, error={e}")
async def _flush_cached_messages_to_unread(self, stream_id: str) -> list:
"""将缓存消息刷新到未读消息列表"""
try:
from .message_manager import message_manager
if message_manager.is_running and message_manager.has_cached_messages(stream_id):
# 获取缓存消息
cached_messages = message_manager.flush_cached_messages(stream_id)
if cached_messages:
# 获取聊天流并添加到未读消息
from src.plugin_system.apis.chat_api import get_chat_manager
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if chat_stream:
for message in cached_messages:
chat_stream.context_manager.context.unread_messages.append(message)
logger.debug(f"刷新缓存消息到未读列表: stream={stream_id}, 数量={len(cached_messages)}")
else:
logger.warning(f"无法找到聊天流: {stream_id}")
return cached_messages
return []
except ImportError:
logger.debug("MessageManager不可用跳过缓存刷新")
return []
except Exception as e:
logger.warning(f"刷新缓存消息失败: stream={stream_id}, error={e}")
return []
async def _calculate_interval(self, stream_id: str, has_messages: bool) -> float:
"""计算下次检查间隔

View File

@@ -6,6 +6,7 @@
import asyncio
import random
import time
from collections import defaultdict, deque
from typing import TYPE_CHECKING, Any
from src.chat.chatter_manager import ChatterManager
@@ -46,6 +47,14 @@ class MessageManager:
self.sleep_manager = SleepManager()
self.wakeup_manager = WakeUpManager(self.sleep_manager)
# 消息缓存系统 - 直接集成到消息管理器
self.message_caches: Dict[str, deque] = defaultdict(deque) # 每个流的消息缓存
self.stream_processing_status: Dict[str, bool] = defaultdict(bool) # 流的处理状态
self.cache_stats = {
"total_cached_messages": 0,
"total_flushed_messages": 0,
}
# 不再需要全局上下文管理器,直接通过 ChatManager 访问各个 ChatStream 的 context_manager
async def start(self):
@@ -72,6 +81,9 @@ class MessageManager:
except Exception as e:
logger.error(f"启动流缓存管理器失败: {e}")
# 启动消息缓存系统(内置)
logger.info("📦 消息缓存系统已启动")
# 启动自适应流管理器
try:
from src.chat.message_manager.adaptive_stream_manager import init_adaptive_stream_manager
@@ -115,6 +127,11 @@ class MessageManager:
except Exception as e:
logger.error(f"停止流缓存管理器失败: {e}")
# 停止消息缓存系统(内置)
self.message_caches.clear()
self.stream_processing_status.clear()
logger.info("📦 消息缓存系统已停止")
# 停止自适应流管理器
try:
from src.chat.message_manager.adaptive_stream_manager import shutdown_adaptive_stream_manager
@@ -429,6 +446,115 @@ class MessageManager:
except Exception as e:
logger.error(f"清除流 {stream_id} 的未读消息时发生错误: {e}")
# ===== 消息缓存系统方法 =====
def add_message_to_cache(self, stream_id: str, message: DatabaseMessages) -> bool:
"""添加消息到缓存
Args:
stream_id: 流ID
message: 消息对象
Returns:
bool: 是否成功添加到缓存
"""
try:
if not self.is_running:
return False
self.message_caches[stream_id].append(message)
self.cache_stats["total_cached_messages"] += 1
logger.debug(f"消息已添加到缓存: stream={stream_id}, content={message.processed_plain_text[:50]}...")
return True
except Exception as e:
logger.error(f"添加消息到缓存失败: stream={stream_id}, error={e}")
return False
def flush_cached_messages(self, stream_id: str) -> list[DatabaseMessages]:
"""刷新缓存消息到未读消息列表
Args:
stream_id: 流ID
Returns:
List[DatabaseMessages]: 缓存的消息列表
"""
try:
if stream_id not in self.message_caches:
return []
cached_messages = list(self.message_caches[stream_id])
self.message_caches[stream_id].clear()
self.cache_stats["total_flushed_messages"] += len(cached_messages)
logger.debug(f"刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}")
return cached_messages
except Exception as e:
logger.error(f"刷新缓存消息失败: stream={stream_id}, error={e}")
return []
def set_stream_processing_status(self, stream_id: str, is_processing: bool):
"""设置流的处理状态
Args:
stream_id: 流ID
is_processing: 是否正在处理
"""
try:
self.stream_processing_status[stream_id] = is_processing
logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}")
except Exception as e:
logger.error(f"设置流处理状态失败: stream={stream_id}, error={e}")
def get_stream_processing_status(self, stream_id: str) -> bool:
"""获取流的处理状态
Args:
stream_id: 流ID
Returns:
bool: 是否正在处理
"""
return self.stream_processing_status.get(stream_id, False)
def has_cached_messages(self, stream_id: str) -> bool:
"""检查流是否有缓存消息
Args:
stream_id: 流ID
Returns:
bool: 是否有缓存消息
"""
return stream_id in self.message_caches and len(self.message_caches[stream_id]) > 0
def get_cached_messages_count(self, stream_id: str) -> int:
"""获取流的缓存消息数量
Args:
stream_id: 流ID
Returns:
int: 缓存消息数量
"""
return len(self.message_caches.get(stream_id, []))
def get_cache_stats(self) -> dict[str, Any]:
"""获取缓存统计信息
Returns:
Dict[str, Any]: 缓存统计信息
"""
return {
"total_cached_messages": self.cache_stats["total_cached_messages"],
"total_flushed_messages": self.cache_stats["total_flushed_messages"],
"active_caches": len(self.message_caches),
"cached_streams": len([s for s in self.message_caches.keys() if self.message_caches[s]]),
"processing_streams": len([s for s in self.stream_processing_status.keys() if self.stream_processing_status[s]]),
}
# 创建全局消息管理器实例
message_manager = MessageManager()