diff --git a/src/chat/memory_system/memory_system.py b/src/chat/memory_system/memory_system.py index 3422ca296..a4197756b 100644 --- a/src/chat/memory_system/memory_system.py +++ b/src/chat/memory_system/memory_system.py @@ -1741,10 +1741,8 @@ def get_memory_system() -> MemorySystem: if memory_system is None: logger.warning("Global memory_system is None. Creating new uninitialized instance. This might be a problem.") memory_system = MemorySystem() - logger.info(f"get_memory_system() called, returning instance with id: {id(memory_system)}") return memory_system - async def initialize_memory_system(llm_model: LLMRequest | None = None): """初始化全局记忆系统""" global memory_system diff --git a/src/chat/message_manager/adaptive_stream_manager.py b/src/chat/message_manager/adaptive_stream_manager.py deleted file mode 100644 index fa0a97de5..000000000 --- a/src/chat/message_manager/adaptive_stream_manager.py +++ /dev/null @@ -1,482 +0,0 @@ -""" -自适应流管理器 - 动态并发限制和异步流池管理 -根据系统负载和流优先级动态调整并发限制 -""" - -import asyncio -import time -from dataclasses import dataclass, field -from enum import Enum - -import psutil - -from src.common.logger import get_logger - -logger = get_logger("adaptive_stream_manager") - - -class StreamPriority(Enum): - """流优先级""" - - LOW = 1 - NORMAL = 2 - HIGH = 3 - CRITICAL = 4 - - -@dataclass -class SystemMetrics: - """系统指标""" - - cpu_usage: float = 0.0 - memory_usage: float = 0.0 - active_coroutines: int = 0 - event_loop_lag: float = 0.0 - timestamp: float = field(default_factory=time.time) - - -@dataclass -class StreamMetrics: - """流指标""" - - stream_id: str - priority: StreamPriority - message_rate: float = 0.0 # 消息速率(消息/分钟) - response_time: float = 0.0 # 平均响应时间 - last_activity: float = field(default_factory=time.time) - consecutive_failures: int = 0 - is_active: bool = True - - -class AdaptiveStreamManager: - """自适应流管理器""" - - def __init__( - self, - base_concurrent_limit: int = 50, - max_concurrent_limit: int = 200, - min_concurrent_limit: int = 10, - metrics_window: float = 60.0, # 指标窗口时间 - adjustment_interval: float = 30.0, # 调整间隔 - cpu_threshold_high: float = 0.8, # CPU高负载阈值 - cpu_threshold_low: float = 0.3, # CPU低负载阈值 - memory_threshold_high: float = 0.85, # 内存高负载阈值 - ): - self.base_concurrent_limit = base_concurrent_limit - self.max_concurrent_limit = max_concurrent_limit - self.min_concurrent_limit = min_concurrent_limit - self.metrics_window = metrics_window - self.adjustment_interval = adjustment_interval - self.cpu_threshold_high = cpu_threshold_high - self.cpu_threshold_low = cpu_threshold_low - self.memory_threshold_high = memory_threshold_high - - # 当前状态 - self.current_limit = base_concurrent_limit - self.active_streams: set[str] = set() - self.pending_streams: set[str] = set() - self.stream_metrics: dict[str, StreamMetrics] = {} - - # 异步信号量 - self.semaphore = asyncio.Semaphore(base_concurrent_limit) - self.priority_semaphore = asyncio.Semaphore(5) # 高优先级专用信号量 - - # 系统监控 - self.system_metrics: list[SystemMetrics] = [] - self.last_adjustment_time = 0.0 - - # 统计信息 - self.stats = { - "total_requests": 0, - "accepted_requests": 0, - "rejected_requests": 0, - "priority_accepts": 0, - "limit_adjustments": 0, - "avg_concurrent_streams": 0, - "peak_concurrent_streams": 0, - } - - # 监控任务 - self.monitor_task: asyncio.Task | None = None - self.adjustment_task: asyncio.Task | None = None - self.is_running = False - - logger.info(f"自适应流管理器初始化完成 (base_limit={base_concurrent_limit}, max_limit={max_concurrent_limit})") - - async def start(self): - """启动自适应管理器""" - if self.is_running: - logger.warning("自适应流管理器已经在运行") - return - - self.is_running = True - self.monitor_task = asyncio.create_task(self._system_monitor_loop(), name="system_monitor") - self.adjustment_task = asyncio.create_task(self._adjustment_loop(), name="limit_adjustment") - - async def stop(self): - """停止自适应管理器""" - if not self.is_running: - return - - self.is_running = False - - # 停止监控任务 - if self.monitor_task and not self.monitor_task.done(): - self.monitor_task.cancel() - try: - await asyncio.wait_for(self.monitor_task, timeout=10.0) - except asyncio.TimeoutError: - logger.warning("系统监控任务停止超时") - except Exception as e: - logger.error(f"停止系统监控任务时出错: {e}") - - if self.adjustment_task and not self.adjustment_task.done(): - self.adjustment_task.cancel() - try: - await asyncio.wait_for(self.adjustment_task, timeout=10.0) - except asyncio.TimeoutError: - logger.warning("限制调整任务停止超时") - except Exception as e: - logger.error(f"停止限制调整任务时出错: {e}") - - logger.info("自适应流管理器已停止") - - async def acquire_stream_slot( - self, stream_id: str, priority: StreamPriority = StreamPriority.NORMAL, force: bool = False - ) -> bool: - """ - 获取流处理槽位 - - Args: - stream_id: 流ID - priority: 优先级 - force: 是否强制获取(突破限制) - - Returns: - bool: 是否成功获取槽位 - """ - # 检查管理器是否已启动 - if not self.is_running: - logger.warning(f"自适应流管理器未运行,直接允许流 {stream_id}") - return True - - self.stats["total_requests"] += 1 - current_time = time.time() - - # 更新流指标 - if stream_id not in self.stream_metrics: - self.stream_metrics[stream_id] = StreamMetrics(stream_id=stream_id, priority=priority) - self.stream_metrics[stream_id].last_activity = current_time - - # 检查是否已经活跃 - if stream_id in self.active_streams: - logger.debug(f"流 {stream_id} 已经在活跃列表中") - return True - - # 优先级处理 - if priority in [StreamPriority.HIGH, StreamPriority.CRITICAL]: - return await self._acquire_priority_slot(stream_id, priority, force) - - # 检查是否需要强制分发(消息积压) - if not force and self._should_force_dispatch(stream_id): - force = True - logger.info(f"流 {stream_id} 消息积压严重,强制分发") - - # 尝试获取常规信号量 - try: - # 使用wait_for实现非阻塞获取 - acquired = await asyncio.wait_for(self.semaphore.acquire(), timeout=0.001) - if acquired: - self.active_streams.add(stream_id) - self.stats["accepted_requests"] += 1 - logger.debug(f"流 {stream_id} 获取常规槽位成功 (当前活跃: {len(self.active_streams)})") - return True - except asyncio.TimeoutError: - logger.debug(f"常规信号量已满: {stream_id}") - except Exception as e: - logger.warning(f"获取常规槽位时出错: {e}") - - # 如果强制分发,尝试突破限制 - if force: - return await self._force_acquire_slot(stream_id) - - # 无法获取槽位 - self.stats["rejected_requests"] += 1 - logger.debug(f"流 {stream_id} 获取槽位失败,当前限制: {self.current_limit}, 活跃流: {len(self.active_streams)}") - return False - - async def _acquire_priority_slot(self, stream_id: str, priority: StreamPriority, force: bool) -> bool: - """获取优先级槽位""" - try: - # 优先级信号量有少量槽位 - acquired = await asyncio.wait_for(self.priority_semaphore.acquire(), timeout=0.001) - if acquired: - self.active_streams.add(stream_id) - self.stats["priority_accepts"] += 1 - self.stats["accepted_requests"] += 1 - logger.debug(f"流 {stream_id} 获取优先级槽位成功 (优先级: {priority.name})") - return True - except asyncio.TimeoutError: - logger.debug(f"优先级信号量已满: {stream_id}") - except Exception as e: - logger.warning(f"获取优先级槽位时出错: {e}") - - # 如果优先级槽位也满了,检查是否强制 - if force or priority == StreamPriority.CRITICAL: - return await self._force_acquire_slot(stream_id) - - return False - - async def _force_acquire_slot(self, stream_id: str) -> bool: - """强制获取槽位(突破限制)""" - # 检查是否超过最大限制 - if len(self.active_streams) >= self.max_concurrent_limit: - logger.warning(f"达到最大并发限制 {self.max_concurrent_limit},无法为流 {stream_id} 强制分发") - return False - - # 强制添加到活跃列表 - self.active_streams.add(stream_id) - self.stats["accepted_requests"] += 1 - logger.warning(f"流 {stream_id} 突破并发限制强制分发 (当前活跃: {len(self.active_streams)})") - return True - - def release_stream_slot(self, stream_id: str): - """释放流处理槽位""" - if stream_id in self.active_streams: - self.active_streams.remove(stream_id) - - # 释放相应的信号量 - metrics = self.stream_metrics.get(stream_id) - if metrics and metrics.priority in [StreamPriority.HIGH, StreamPriority.CRITICAL]: - self.priority_semaphore.release() - else: - self.semaphore.release() - - logger.debug(f"流 {stream_id} 释放槽位 (当前活跃: {len(self.active_streams)})") - - def _should_force_dispatch(self, stream_id: str) -> bool: - """判断是否应该强制分发""" - # 这里可以实现基于消息积压的判断逻辑 - # 简化版本:基于流的历史活跃度和优先级 - metrics = self.stream_metrics.get(stream_id) - if not metrics: - return False - - # 如果是高优先级流,更容易强制分发 - if metrics.priority == StreamPriority.HIGH: - return True - - # 如果最近有活跃且响应时间较长,可能需要强制分发 - current_time = time.time() - if ( - current_time - metrics.last_activity < 300 # 5分钟内有活动 - and metrics.response_time > 5.0 - ): # 响应时间超过5秒 - return True - - return False - - async def _system_monitor_loop(self): - """系统监控循环""" - logger.info("系统监控循环启动") - - while self.is_running: - try: - await asyncio.sleep(5.0) # 每5秒监控一次 - await self._collect_system_metrics() - except asyncio.CancelledError: - logger.info("系统监控循环被取消") - break - except Exception as e: - logger.error(f"系统监控出错: {e}") - - logger.info("系统监控循环结束") - - async def _collect_system_metrics(self): - """收集系统指标""" - try: - # CPU使用率 - cpu_usage = psutil.cpu_percent(interval=None) / 100.0 - - # 内存使用率 - memory = psutil.virtual_memory() - memory_usage = memory.percent / 100.0 - - # 活跃协程数量 - try: - active_coroutines = len(asyncio.all_tasks()) - except: - active_coroutines = 0 - - # 事件循环延迟 - event_loop_lag = 0.0 - try: - asyncio.get_running_loop() - start_time = time.time() - await asyncio.sleep(0) - event_loop_lag = time.time() - start_time - except: - pass - - metrics = SystemMetrics( - cpu_usage=cpu_usage, - memory_usage=memory_usage, - active_coroutines=active_coroutines, - event_loop_lag=event_loop_lag, - timestamp=time.time(), - ) - - self.system_metrics.append(metrics) - - # 保持指标窗口大小 - cutoff_time = time.time() - self.metrics_window - self.system_metrics = [m for m in self.system_metrics if m.timestamp > cutoff_time] - - # 更新统计信息 - self.stats["avg_concurrent_streams"] = ( - self.stats["avg_concurrent_streams"] * 0.9 + len(self.active_streams) * 0.1 - ) - self.stats["peak_concurrent_streams"] = max(self.stats["peak_concurrent_streams"], len(self.active_streams)) - - except Exception as e: - logger.error(f"收集系统指标失败: {e}") - - async def _adjustment_loop(self): - """限制调整循环""" - logger.info("限制调整循环启动") - - while self.is_running: - try: - await asyncio.sleep(self.adjustment_interval) - await self._adjust_concurrent_limit() - except asyncio.CancelledError: - logger.info("限制调整循环被取消") - break - except Exception as e: - logger.error(f"限制调整出错: {e}") - - logger.info("限制调整循环结束") - - async def _adjust_concurrent_limit(self): - """调整并发限制""" - if not self.system_metrics: - return - - current_time = time.time() - if current_time - self.last_adjustment_time < self.adjustment_interval: - return - - # 计算平均系统指标 - recent_metrics = self.system_metrics[-10:] if len(self.system_metrics) >= 10 else self.system_metrics - if not recent_metrics: - return - - avg_cpu = sum(m.cpu_usage for m in recent_metrics) / len(recent_metrics) - avg_memory = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics) - avg_coroutines = sum(m.active_coroutines for m in recent_metrics) / len(recent_metrics) - - # 调整策略 - old_limit = self.current_limit - adjustment_factor = 1.0 - - # CPU负载调整 - if avg_cpu > self.cpu_threshold_high: - adjustment_factor *= 0.8 # 减少20% - elif avg_cpu < self.cpu_threshold_low: - adjustment_factor *= 1.2 # 增加20% - - # 内存负载调整 - if avg_memory > self.memory_threshold_high: - adjustment_factor *= 0.7 # 减少30% - - # 协程数量调整 - if avg_coroutines > 1000: - adjustment_factor *= 0.9 # 减少10% - - # 应用调整 - new_limit = int(self.current_limit * adjustment_factor) - new_limit = max(self.min_concurrent_limit, min(self.max_concurrent_limit, new_limit)) - - # 检查是否需要调整信号量 - if new_limit != self.current_limit: - await self._adjust_semaphore(self.current_limit, new_limit) - self.current_limit = new_limit - self.stats["limit_adjustments"] += 1 - self.last_adjustment_time = current_time - - logger.info( - f"并发限制调整: {old_limit} -> {new_limit} " - f"(CPU: {avg_cpu:.2f}, 内存: {avg_memory:.2f}, 协程: {avg_coroutines:.0f})" - ) - - async def _adjust_semaphore(self, old_limit: int, new_limit: int): - """调整信号量大小""" - if new_limit > old_limit: - # 增加信号量槽位 - for _ in range(new_limit - old_limit): - self.semaphore.release() - elif new_limit < old_limit: - # 减少信号量槽位(通过等待槽位被释放) - reduction = old_limit - new_limit - for _ in range(reduction): - try: - await asyncio.wait_for(self.semaphore.acquire(), timeout=0.001) - except: - # 如果无法立即获取,说明当前使用量接近限制 - break - - def update_stream_metrics(self, stream_id: str, **kwargs): - """更新流指标""" - if stream_id not in self.stream_metrics: - return - - metrics = self.stream_metrics[stream_id] - for key, value in kwargs.items(): - if hasattr(metrics, key): - setattr(metrics, key, value) - - def get_stats(self) -> dict: - """获取统计信息""" - stats = self.stats.copy() - stats.update( - { - "current_limit": self.current_limit, - "active_streams": len(self.active_streams), - "pending_streams": len(self.pending_streams), - "is_running": self.is_running, - "system_cpu": self.system_metrics[-1].cpu_usage if self.system_metrics else 0, - "system_memory": self.system_metrics[-1].memory_usage if self.system_metrics else 0, - } - ) - - # 计算接受率 - if stats["total_requests"] > 0: - stats["acceptance_rate"] = stats["accepted_requests"] / stats["total_requests"] - else: - stats["acceptance_rate"] = 0 - - return stats - - -# 全局自适应管理器实例 -_adaptive_manager: AdaptiveStreamManager | None = None - - -def get_adaptive_stream_manager() -> AdaptiveStreamManager: - """获取自适应流管理器实例""" - global _adaptive_manager - if _adaptive_manager is None: - _adaptive_manager = AdaptiveStreamManager() - return _adaptive_manager - - -async def init_adaptive_stream_manager(): - """初始化自适应流管理器""" - manager = get_adaptive_stream_manager() - await manager.start() - - -async def shutdown_adaptive_stream_manager(): - """关闭自适应流管理器""" - manager = get_adaptive_stream_manager() - await manager.stop() diff --git a/src/chat/message_manager/context_manager.py b/src/chat/message_manager/context_manager.py index c569fde6b..aabf375fd 100644 --- a/src/chat/message_manager/context_manager.py +++ b/src/chat/message_manager/context_manager.py @@ -35,8 +35,14 @@ class SingleStreamContextManager: self.last_access_time = time.time() self.access_count = 0 self.total_messages = 0 + + # 标记是否已初始化历史消息 + self._history_initialized = False - logger.debug(f"单流上下文管理器初始化: {stream_id}") + logger.info(f"[新建] 单流上下文管理器初始化: {stream_id} (id={id(self)})") + + # 异步初始化历史消息(不阻塞构造函数) + asyncio.create_task(self._initialize_history_from_db()) def get_context(self) -> StreamContext: """获取流上下文""" @@ -293,6 +299,59 @@ class SingleStreamContextManager: """更新访问统计""" self.last_access_time = time.time() self.access_count += 1 + + async def _initialize_history_from_db(self): + """从数据库初始化历史消息到context中""" + if self._history_initialized: + logger.info(f"历史消息已初始化,跳过: {self.stream_id}") + return + + # 立即设置标志,防止并发重复加载 + logger.info(f"设置历史初始化标志: {self.stream_id}") + self._history_initialized = True + + try: + logger.info(f"开始从数据库加载历史消息: {self.stream_id}") + + from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat + + # 加载历史消息(限制数量为max_context_size的2倍,用于丰富上下文) + db_messages = await get_raw_msg_before_timestamp_with_chat( + chat_id=self.stream_id, + timestamp=time.time(), + limit=self.max_context_size * 2, + ) + + if db_messages: + # 将数据库消息转换为 DatabaseMessages 对象并添加到历史 + for msg_dict in db_messages: + try: + # 使用 ** 解包字典作为关键字参数 + db_msg = DatabaseMessages(**msg_dict) + + # 标记为已读 + db_msg.is_read = True + + # 添加到历史消息 + self.context.history_messages.append(db_msg) + + except Exception as e: + logger.warning(f"转换历史消息失败 (message_id={msg_dict.get('message_id', 'unknown')}): {e}") + continue + + logger.info(f"成功从数据库加载 {len(self.context.history_messages)} 条历史消息到内存: {self.stream_id}") + else: + logger.debug(f"没有历史消息需要加载: {self.stream_id}") + + except Exception as e: + logger.error(f"从数据库初始化历史消息失败: {self.stream_id}, {e}", exc_info=True) + # 加载失败时重置标志,允许重试 + self._history_initialized = False + + async def ensure_history_initialized(self): + """确保历史消息已初始化(供外部调用)""" + if not self._history_initialized: + await self._initialize_history_from_db() async def _calculate_message_interest(self, message: DatabaseMessages) -> float: """ diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index c111bf8b4..ec2de4c83 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -9,7 +9,6 @@ from typing import Any from src.chat.chatter_manager import ChatterManager from src.chat.energy_system import energy_manager -from src.chat.message_manager.adaptive_stream_manager import StreamPriority from src.common.data_models.message_manager_data_model import StreamContext from src.common.logger import get_logger from src.config.config import global_config @@ -117,31 +116,6 @@ class StreamLoopManager: logger.debug(f"流 {stream_id} 循环已在运行") return True - # 使用自适应流管理器获取槽位 - try: - from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager - - adaptive_manager = get_adaptive_stream_manager() - - if adaptive_manager.is_running: - # 确定流优先级 - priority = self._determine_stream_priority(stream_id) - - # 获取处理槽位 - slot_acquired = await adaptive_manager.acquire_stream_slot( - stream_id=stream_id, priority=priority, force=force - ) - - if slot_acquired: - logger.debug(f"成功获取流处理槽位: {stream_id} (优先级: {priority.name})") - else: - logger.debug(f"自适应管理器拒绝槽位请求: {stream_id},尝试回退方案") - else: - logger.debug("自适应管理器未运行") - - except Exception as e: - logger.debug(f"自适应管理器获取槽位失败: {e}") - # 创建流循环任务 try: loop_task = asyncio.create_task(self._stream_loop_worker(stream_id), name=f"stream_loop_{stream_id}") @@ -158,35 +132,8 @@ class StreamLoopManager: except Exception as e: logger.error(f"启动流循环任务失败 {stream_id}: {e}") - # 释放槽位 - from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager - - adaptive_manager = get_adaptive_stream_manager() - adaptive_manager.release_stream_slot(stream_id) - return False - def _determine_stream_priority(self, stream_id: str) -> "StreamPriority": - """确定流优先级""" - try: - from src.chat.message_manager.adaptive_stream_manager import StreamPriority - - # 这里可以基于流的历史数据、用户身份等确定优先级 - # 简化版本:基于流ID的哈希值分配优先级 - hash_value = hash(stream_id) % 10 - - if hash_value >= 8: # 20% 高优先级 - return StreamPriority.HIGH - elif hash_value >= 5: # 30% 中等优先级 - return StreamPriority.NORMAL - else: # 50% 低优先级 - return StreamPriority.LOW - - except Exception: - from src.chat.message_manager.adaptive_stream_manager import StreamPriority - - return StreamPriority.NORMAL - async def stop_stream_loop(self, stream_id: str) -> bool: """停止指定流的循环任务 @@ -248,19 +195,6 @@ class StreamLoopManager: unread_count = self._get_unread_count(context) force_dispatch = self._needs_force_dispatch_for_context(context, unread_count) - # 3. 更新自适应管理器指标 - try: - from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager - - adaptive_manager = get_adaptive_stream_manager() - adaptive_manager.update_stream_metrics( - stream_id, - message_rate=unread_count / 5.0 if unread_count > 0 else 0.0, # 简化计算 - last_activity=time.time(), - ) - except Exception as e: - logger.debug(f"更新流指标失败: {e}") - has_messages = force_dispatch or await self._has_messages_to_process(context) if has_messages: @@ -313,16 +247,6 @@ class StreamLoopManager: except Exception as e: logger.debug(f"清理 StreamContext 任务记录失败: {e}") - # 释放自适应管理器的槽位 - try: - from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager - - adaptive_manager = get_adaptive_stream_manager() - adaptive_manager.release_stream_slot(stream_id) - logger.debug(f"释放自适应流处理槽位: {stream_id}") - except Exception as e: - logger.debug(f"释放自适应流处理槽位失败: {e}") - # 清理间隔记录 self._last_intervals.pop(stream_id, None) diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 2cdca1938..49a617730 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -74,18 +74,6 @@ class MessageManager: # 启动消息缓存系统(内置) logger.info("📦 消息缓存系统已启动") - # 启动自适应流管理器 - try: - from src.chat.message_manager.adaptive_stream_manager import init_adaptive_stream_manager - - await init_adaptive_stream_manager() - logger.info("🎯 自适应流管理器已启动") - except Exception as e: - logger.error(f"启动自适应流管理器失败: {e}") - - # 启动睡眠和唤醒管理器 - # 睡眠系统的定时任务启动移至 main.py - # 启动流循环管理器并设置chatter_manager await stream_loop_manager.start() stream_loop_manager.set_chatter_manager(self.chatter_manager) @@ -113,16 +101,6 @@ class MessageManager: self.stream_processing_status.clear() logger.info("📦 消息缓存系统已停止") - # 停止自适应流管理器 - try: - from src.chat.message_manager.adaptive_stream_manager import shutdown_adaptive_stream_manager - - await shutdown_adaptive_stream_manager() - logger.info("🎯 自适应流管理器已停止") - except Exception as e: - logger.error(f"停止自适应流管理器失败: {e}") - - # 停止流循环管理器 await stream_loop_manager.stop() diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 32e8c90e5..4ff4626dd 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -301,67 +301,6 @@ class ChatBot: logger.error(f"处理命令时出错: {e}") return False, None, True # 出错时继续处理消息 - async def handle_notice_message(self, message: DatabaseMessages): - """处理notice消息 - - notice消息是系统事件通知(如禁言、戳一戳等),具有以下特点: - 1. 默认不触发聊天流程,只记录 - 2. 可通过配置开启触发聊天流程 - 3. 会在提示词中展示 - - Args: - message: DatabaseMessages 对象 - - Returns: - bool: True表示notice已完整处理(需要存储并终止后续流程) - False表示不是notice或notice需要继续处理(触发聊天流程) - """ - # 检查是否是notice消息 - if message.is_notify: - logger.info(f"收到notice消息: {message.notice_type}") - - # 根据配置决定是否触发聊天流程 - if not global_config.notice.enable_notice_trigger_chat: - logger.debug("notice消息不触发聊天流程(配置已关闭),将存储后终止") - return True # 返回True:需要在调用处存储并终止 - else: - logger.debug("notice消息触发聊天流程(配置已开启),继续处理") - return False # 返回False:继续正常流程,作为普通消息处理 - - # 兼容旧的notice判断方式 - if message.message_id == "notice": - # 为 DatabaseMessages 设置 is_notify 运行时属性 - from src.chat.message_receive.message_processor import set_db_message_runtime_attr - set_db_message_runtime_attr(message, "is_notify", True) - logger.info("旧格式notice消息") - - # 同样根据配置决定 - if not global_config.notice.enable_notice_trigger_chat: - logger.debug("旧格式notice消息不触发聊天流程,将存储后终止") - return True # 需要存储并终止 - else: - logger.debug("旧格式notice消息触发聊天流程,继续处理") - return False # 继续正常流程 - - # DatabaseMessages 不再有 message_segment,适配器响应处理已在消息处理阶段完成 - # 这里保留逻辑以防万一,但实际上不会再执行到 - return False # 不是notice消息,继续正常流程 - - async def handle_adapter_response(self, message: DatabaseMessages): - """处理适配器命令响应 - - 注意: 此方法目前未被调用,但保留以备将来使用 - """ - try: - from src.plugin_system.apis.send_api import put_adapter_response - - # DatabaseMessages 使用 message_segments 字段存储消息段 - # 注意: 这可能需要根据实际使用情况进行调整 - logger.warning("handle_adapter_response 方法被调用,但目前未实现对 DatabaseMessages 的支持") - - except Exception as e: - logger.error(f"处理适配器响应时出错: {e}") - async def message_process(self, message_data: dict[str, Any]) -> None: """处理转化后的统一格式消息""" try: @@ -454,29 +393,6 @@ class ChatBot: logger.info(f"[硬编码过滤] 检测到媒体内容处理失败({processed_text}),消息被静默处理。") return - # 处理notice消息 - # notice_handled=True: 表示notice不触发聊天,需要在此存储并终止 - # notice_handled=False: 表示notice触发聊天或不是notice,继续正常流程 - notice_handled = await self.handle_notice_message(message) - if notice_handled: - # notice消息不触发聊天流程,在此进行存储和记录后终止 - try: - # message 已经是 DatabaseMessages,直接使用 - # 添加到message_manager(这会将notice添加到全局notice管理器) - await message_manager.add_message(chat.stream_id, message) - logger.info(f"✅ Notice消息已添加到message_manager: type={message.notice_type}, stream={chat.stream_id}") - - except Exception as e: - logger.error(f"Notice消息添加到message_manager失败: {e}", exc_info=True) - - # 存储notice消息到数据库(需要更新 storage.py 支持 DatabaseMessages) - # 暂时跳过存储,等待更新 storage.py - logger.debug("notice消息已添加到message_manager(存储功能待更新)") - return - - # 如果notice_handled=False,则继续执行后续流程 - # 对于启用触发聊天的notice,会在后续的正常流程中被存储和处理 - # 过滤检查 # DatabaseMessages 使用 display_message 作为原始消息表示 raw_text = message.display_message or message.processed_plain_text or "" diff --git a/src/chat/message_receive/chat_stream.py b/src/chat/message_receive/chat_stream.py index a850d56c9..8f6dd37e1 100644 --- a/src/chat/message_receive/chat_stream.py +++ b/src/chat/message_receive/chat_stream.py @@ -62,36 +62,6 @@ class ChatStream: self._focus_energy = 0.5 # 内部存储的focus_energy值 self.no_reply_consecutive = 0 - def __deepcopy__(self, memo): - """自定义深拷贝方法,避免复制不可序列化的 asyncio.Task 对象""" - import copy - - # 创建新的实例 - new_stream = ChatStream( - stream_id=self.stream_id, - platform=self.platform, - user_info=copy.deepcopy(self.user_info, memo), - group_info=copy.deepcopy(self.group_info, memo), - ) - - # 复制基本属性 - new_stream.create_time = self.create_time - new_stream.last_active_time = self.last_active_time - new_stream.sleep_pressure = self.sleep_pressure - new_stream.saved = self.saved - new_stream.base_interest_energy = self.base_interest_energy - new_stream._focus_energy = self._focus_energy - new_stream.no_reply_consecutive = self.no_reply_consecutive - - # 复制 context_manager(包含 stream_context) - new_stream.context_manager = copy.deepcopy(self.context_manager, memo) - - # 清理 processing_task(如果存在) - if hasattr(new_stream.context_manager.context, "processing_task"): - new_stream.context_manager.context.processing_task = None - - return new_stream - def to_dict(self) -> dict: """转换为字典格式""" return { @@ -461,8 +431,7 @@ class ChatManager: # 更新用户信息和群组信息 stream.update_active_time() - stream = copy.deepcopy(stream) # 返回副本以避免外部修改影响缓存 - if user_info and user_info.platform and user_info.user_id: + if user_info.platform and user_info.user_id: stream.user_info = user_info if group_info: stream.group_info = group_info @@ -530,7 +499,6 @@ class ChatManager: logger.error(f"获取或创建聊天流失败: {e}", exc_info=True) raise e - stream = copy.deepcopy(stream) from src.common.data_models.database_data_model import DatabaseMessages if stream_id in self.last_messages and isinstance(self.last_messages[stream_id], DatabaseMessages): @@ -539,11 +507,12 @@ class ChatManager: logger.debug(f"聊天流 {stream_id} 不在最后消息列表中,可能是新创建的") # 确保 ChatStream 有自己的 context_manager - if not hasattr(stream, "context_manager"): + if not hasattr(stream, "context_manager") or stream.context_manager is None: from src.chat.message_manager.context_manager import SingleStreamContextManager from src.common.data_models.message_manager_data_model import StreamContext from src.plugin_system.base.component_types import ChatMode, ChatType + logger.info(f"为 stream {stream_id} 创建新的 context_manager") stream.context_manager = SingleStreamContextManager( stream_id=stream_id, context=StreamContext( @@ -552,6 +521,8 @@ class ChatManager: chat_mode=ChatMode.NORMAL, ), ) + else: + logger.info(f"stream {stream_id} 已有 context_manager,跳过创建") # 保存到内存和数据库 self.streams[stream_id] = stream @@ -781,11 +752,12 @@ class ChatManager: # await stream.set_context(self.last_messages[stream.stream_id]) # 确保 ChatStream 有自己的 context_manager - if not hasattr(stream, "context_manager"): + if not hasattr(stream, "context_manager") or stream.context_manager is None: from src.chat.message_manager.context_manager import SingleStreamContextManager from src.common.data_models.message_manager_data_model import StreamContext from src.plugin_system.base.component_types import ChatMode, ChatType + logger.debug(f"为加载的 stream {stream.stream_id} 创建新的 context_manager") stream.context_manager = SingleStreamContextManager( stream_id=stream.stream_id, context=StreamContext( @@ -794,6 +766,8 @@ class ChatManager: chat_mode=ChatMode.NORMAL, ), ) + else: + logger.debug(f"加载的 stream {stream.stream_id} 已有 context_manager") except Exception as e: logger.error(f"从数据库加载所有聊天流失败 (SQLAlchemy): {e}", exc_info=True) diff --git a/src/chat/message_receive/message.py b/src/chat/message_receive/message.py index cdde98f8b..8286f5cfe 100644 --- a/src/chat/message_receive/message.py +++ b/src/chat/message_receive/message.py @@ -160,7 +160,7 @@ class MessageProcessBase(Message): return "[表情,网卡了加载不出来]" elif seg.type == "voice": # 检查消息是否由机器人自己发送 - # 检查消息是否由机器人自己发送 + # self.message_info 来自 MessageBase,指当前消息的信息 if self.message_info and self.message_info.user_info and str(self.message_info.user_info.user_id) == str(global_config.bot.qq_account): logger.info(f"检测到机器人自身发送的语音消息 (User ID: {self.message_info.user_info.user_id}),尝试从缓存获取文本。") if isinstance(seg.data, str): @@ -182,10 +182,24 @@ class MessageProcessBase(Message): return f"@{nickname}" return f"@{seg.data}" if isinstance(seg.data, str) else "@未知用户" elif seg.type == "reply": - if self.reply and hasattr(self.reply, "processed_plain_text"): - # print(f"self.reply.processed_plain_text: {self.reply.processed_plain_text}") - # print(f"reply: {self.reply}") - return f"[回复<{self.reply.message_info.user_info.user_nickname}({self.reply.message_info.user_info.user_id})> 的消息:{self.reply.processed_plain_text}]" # type: ignore + # 处理回复消息段 + if self.reply: + # 检查 reply 对象是否有必要的属性 + if hasattr(self.reply, "processed_plain_text") and self.reply.processed_plain_text: + # DatabaseMessages 使用 user_info 而不是 message_info.user_info + user_nickname = self.reply.user_info.user_nickname if self.reply.user_info else "未知用户" + user_id = self.reply.user_info.user_id if self.reply.user_info else "" + return f"[回复<{user_nickname}({user_id})> 的消息:{self.reply.processed_plain_text}]" + else: + # reply 对象存在但没有 processed_plain_text,返回简化的回复标识 + logger.debug(f"reply 消息段没有 processed_plain_text 属性,message_id: {getattr(self.reply, 'message_id', 'unknown')}") + return "[回复消息]" + else: + # 没有 reply 对象,但有 reply 消息段(可能是机器人自己发送的消息) + # 这种情况下 seg.data 应该包含被回复消息的 message_id + if isinstance(seg.data, str): + logger.debug(f"处理 reply 消息段,但 self.reply 为 None,reply_to message_id: {seg.data}") + return f"[回复消息 {seg.data}]" return None else: return f"[{seg.type}:{seg.data!s}]" diff --git a/src/chat/message_receive/message_processor.py b/src/chat/message_receive/message_processor.py index 5da582710..b6c66f144 100644 --- a/src/chat/message_receive/message_processor.py +++ b/src/chat/message_receive/message_processor.py @@ -79,13 +79,9 @@ async def process_message_from_dict(message_dict: dict[str, Any], stream_id: str group_name = group_info.group_name if group_info else None group_platform = group_info.platform if group_info else None - # 生成 chat_id - if group_info and group_id: - chat_id = f"{platform}_{group_id}" - elif user_info and user_info.user_id: - chat_id = f"{platform}_{user_info.user_id}_private" - else: - chat_id = stream_id + # chat_id 应该直接使用 stream_id(与数据库存储格式一致) + # stream_id 是通过 platform + user_id/group_id 的 SHA-256 哈希生成的 + chat_id = stream_id # 准备 additional_config additional_config_str = _prepare_additional_config(message_info, is_notify, is_public_notice, notice_type) diff --git a/src/chat/message_receive/message_recv_backup.py b/src/chat/message_receive/message_recv_backup.py deleted file mode 100644 index 3f1943ebf..000000000 --- a/src/chat/message_receive/message_recv_backup.py +++ /dev/null @@ -1,434 +0,0 @@ -# MessageRecv 类备份 - 已从 message.py 中移除 -# 备份日期: 2025-10-31 -# 此类已被 DatabaseMessages 完全取代 - -# MessageRecv 类已被移除 -# 现在所有消息处理都使用 DatabaseMessages -# 如果需要从消息字典创建 DatabaseMessages,请使用 message_processor.process_message_from_dict() -# -# 历史参考: MessageRecv 曾经是接收消息的包装类,现已被 DatabaseMessages 完全取代 -# 迁移完成日期: 2025-10-31 - -""" -# 以下是已删除的 MessageRecv 类(保留作为参考) -class MessageRecv: - 接收消息类 - DatabaseMessages 的轻量级包装器 - - 这个类现在主要作为适配器层,处理外部消息格式并内部使用 DatabaseMessages。 - 保留此类是为了向后兼容性和处理 message_segment 的异步逻辑。 -""" - - def __init__(self, message_dict: dict[str, Any]): - """从MessageCQ的字典初始化 - - Args: - message_dict: MessageCQ序列化后的字典 - """ - # 保留原始消息信息用于某些场景 - self.message_info = BaseMessageInfo.from_dict(message_dict.get("message_info", {})) - self.message_segment = Seg.from_dict(message_dict.get("message_segment", {})) - self.raw_message = message_dict.get("raw_message") - - # 处理状态(在process()之前临时使用) - self._processing_state = { - "is_emoji": False, - "has_emoji": False, - "is_picid": False, - "has_picid": False, - "is_voice": False, - "is_video": False, - "is_mentioned": None, - "is_at": False, - "priority_mode": "interest", - "priority_info": None, - } - - self.chat_stream = None - self.reply = None - self.processed_plain_text = message_dict.get("processed_plain_text", "") - - # 解析additional_config中的notice信息 - self.is_notify = False - self.is_public_notice = False - self.notice_type = None - if self.message_info.additional_config and isinstance(self.message_info.additional_config, dict): - self.is_notify = self.message_info.additional_config.get("is_notice", False) - self.is_public_notice = self.message_info.additional_config.get("is_public_notice", False) - self.notice_type = self.message_info.additional_config.get("notice_type") - - # 兼容性属性 - 代理到 _processing_state - @property - def is_emoji(self) -> bool: - return self._processing_state["is_emoji"] - - @is_emoji.setter - def is_emoji(self, value: bool): - self._processing_state["is_emoji"] = value - - @property - def has_emoji(self) -> bool: - return self._processing_state["has_emoji"] - - @has_emoji.setter - def has_emoji(self, value: bool): - self._processing_state["has_emoji"] = value - - @property - def is_picid(self) -> bool: - return self._processing_state["is_picid"] - - @is_picid.setter - def is_picid(self, value: bool): - self._processing_state["is_picid"] = value - - @property - def has_picid(self) -> bool: - return self._processing_state["has_picid"] - - @has_picid.setter - def has_picid(self, value: bool): - self._processing_state["has_picid"] = value - - @property - def is_voice(self) -> bool: - return self._processing_state["is_voice"] - - @is_voice.setter - def is_voice(self, value: bool): - self._processing_state["is_voice"] = value - - @property - def is_video(self) -> bool: - return self._processing_state["is_video"] - - @is_video.setter - def is_video(self, value: bool): - self._processing_state["is_video"] = value - - @property - def is_mentioned(self): - return self._processing_state["is_mentioned"] - - @is_mentioned.setter - def is_mentioned(self, value): - self._processing_state["is_mentioned"] = value - - @property - def is_at(self) -> bool: - return self._processing_state["is_at"] - - @is_at.setter - def is_at(self, value: bool): - self._processing_state["is_at"] = value - - @property - def priority_mode(self) -> str: - return self._processing_state["priority_mode"] - - @priority_mode.setter - def priority_mode(self, value: str): - self._processing_state["priority_mode"] = value - - @property - def priority_info(self): - return self._processing_state["priority_info"] - - @priority_info.setter - def priority_info(self, value): - self._processing_state["priority_info"] = value - - # 其他常用属性 - interest_value: float = 0.0 - is_command: bool = False - memorized_times: int = 0 - - def __post_init__(self): - """dataclass 初始化后处理""" - self.key_words = [] - self.key_words_lite = [] - - def update_chat_stream(self, chat_stream: "ChatStream"): - self.chat_stream = chat_stream - - def to_database_message(self) -> "DatabaseMessages": - """将 MessageRecv 转换为 DatabaseMessages 对象 - - Returns: - DatabaseMessages: 数据库消息对象 - """ - import time - - message_info = self.message_info - msg_user_info = getattr(message_info, "user_info", None) - stream_user_info = getattr(self.chat_stream, "user_info", None) if self.chat_stream else None - group_info = getattr(self.chat_stream, "group_info", None) if self.chat_stream else None - - message_id = message_info.message_id or "" - message_time = message_info.time if hasattr(message_info, "time") and message_info.time is not None else time.time() - is_mentioned = None - if isinstance(self.is_mentioned, bool): - is_mentioned = self.is_mentioned - elif isinstance(self.is_mentioned, int | float): - is_mentioned = self.is_mentioned != 0 - - # 提取用户信息 - user_id = "" - user_nickname = "" - user_cardname = None - user_platform = "" - if msg_user_info: - user_id = str(getattr(msg_user_info, "user_id", "") or "") - user_nickname = getattr(msg_user_info, "user_nickname", "") or "" - user_cardname = getattr(msg_user_info, "user_cardname", None) - user_platform = getattr(msg_user_info, "platform", "") or "" - elif stream_user_info: - user_id = str(getattr(stream_user_info, "user_id", "") or "") - user_nickname = getattr(stream_user_info, "user_nickname", "") or "" - user_cardname = getattr(stream_user_info, "user_cardname", None) - user_platform = getattr(stream_user_info, "platform", "") or "" - - # 提取聊天流信息 - chat_user_id = str(getattr(stream_user_info, "user_id", "") or "") if stream_user_info else "" - chat_user_nickname = getattr(stream_user_info, "user_nickname", "") or "" if stream_user_info else "" - chat_user_cardname = getattr(stream_user_info, "user_cardname", None) if stream_user_info else None - chat_user_platform = getattr(stream_user_info, "platform", "") or "" if stream_user_info else "" - - group_id = getattr(group_info, "group_id", None) if group_info else None - group_name = getattr(group_info, "group_name", None) if group_info else None - group_platform = getattr(group_info, "platform", None) if group_info else None - - # 准备 additional_config - additional_config_str = None - try: - import orjson - - additional_config_data = {} - - # 首先获取adapter传递的additional_config - if hasattr(message_info, 'additional_config') and message_info.additional_config: - if isinstance(message_info.additional_config, dict): - additional_config_data = message_info.additional_config.copy() - elif isinstance(message_info.additional_config, str): - try: - additional_config_data = orjson.loads(message_info.additional_config) - except Exception as e: - logger.warning(f"无法解析 additional_config JSON: {e}") - additional_config_data = {} - - # 添加notice相关标志 - if self.is_notify: - additional_config_data["is_notice"] = True - additional_config_data["notice_type"] = self.notice_type or "unknown" - additional_config_data["is_public_notice"] = bool(self.is_public_notice) - - # 添加format_info到additional_config中 - if hasattr(message_info, 'format_info') and message_info.format_info: - try: - format_info_dict = message_info.format_info.to_dict() - additional_config_data["format_info"] = format_info_dict - logger.debug(f"[message.py] 嵌入 format_info 到 additional_config: {format_info_dict}") - except Exception as e: - logger.warning(f"将 format_info 转换为字典失败: {e}") - - # 序列化为JSON字符串 - if additional_config_data: - additional_config_str = orjson.dumps(additional_config_data).decode("utf-8") - except Exception as e: - logger.error(f"准备 additional_config 失败: {e}") - - # 创建数据库消息对象 - db_message = DatabaseMessages( - message_id=message_id, - time=float(message_time), - chat_id=self.chat_stream.stream_id if self.chat_stream else "", - processed_plain_text=self.processed_plain_text, - display_message=self.processed_plain_text, - is_mentioned=is_mentioned, - is_at=bool(self.is_at) if self.is_at is not None else None, - is_emoji=bool(self.is_emoji), - is_picid=bool(self.is_picid), - is_command=bool(self.is_command), - is_notify=bool(self.is_notify), - is_public_notice=bool(self.is_public_notice), - notice_type=self.notice_type, - additional_config=additional_config_str, - user_id=user_id, - user_nickname=user_nickname, - user_cardname=user_cardname, - user_platform=user_platform, - chat_info_stream_id=self.chat_stream.stream_id if self.chat_stream else "", - chat_info_platform=self.chat_stream.platform if self.chat_stream else "", - chat_info_create_time=float(self.chat_stream.create_time) if self.chat_stream else 0.0, - chat_info_last_active_time=float(self.chat_stream.last_active_time) if self.chat_stream else 0.0, - chat_info_user_id=chat_user_id, - chat_info_user_nickname=chat_user_nickname, - chat_info_user_cardname=chat_user_cardname, - chat_info_user_platform=chat_user_platform, - chat_info_group_id=group_id, - chat_info_group_name=group_name, - chat_info_group_platform=group_platform, - ) - - # 同步兴趣度等衍生属性 - db_message.interest_value = getattr(self, "interest_value", 0.0) - setattr(db_message, "should_reply", getattr(self, "should_reply", False)) - setattr(db_message, "should_act", getattr(self, "should_act", False)) - - return db_message - - async def process(self) -> None: - """处理消息内容,生成纯文本和详细文本 - - 这个方法必须在创建实例后显式调用,因为它包含异步操作。 - """ - self.processed_plain_text = await self._process_message_segments(self.message_segment) - - async def _process_single_segment(self, segment: Seg) -> str: - """处理单个消息段 - - Args: - segment: 消息段 - - Returns: - str: 处理后的文本 - """ - try: - if segment.type == "text": - self.is_picid = False - self.is_emoji = False - self.is_video = False - return segment.data # type: ignore - elif segment.type == "at": - self.is_picid = False - self.is_emoji = False - self.is_video = False - # 处理at消息,格式为"昵称:QQ号" - if isinstance(segment.data, str) and ":" in segment.data: - nickname, qq_id = segment.data.split(":", 1) - return f"@{nickname}" - return f"@{segment.data}" if isinstance(segment.data, str) else "@未知用户" - elif segment.type == "image": - # 如果是base64图片数据 - if isinstance(segment.data, str): - self.has_picid = True - self.is_picid = True - self.is_emoji = False - self.is_video = False - image_manager = get_image_manager() - # print(f"segment.data: {segment.data}") - _, processed_text = await image_manager.process_image(segment.data) - return processed_text - return "[发了一张图片,网卡了加载不出来]" - elif segment.type == "emoji": - self.has_emoji = True - self.is_emoji = True - self.is_picid = False - self.is_voice = False - self.is_video = False - if isinstance(segment.data, str): - return await get_image_manager().get_emoji_description(segment.data) - return "[发了一个表情包,网卡了加载不出来]" - elif segment.type == "voice": - self.is_picid = False - self.is_emoji = False - self.is_voice = True - self.is_video = False - - # 检查消息是否由机器人自己发送 - if self.message_info and self.message_info.user_info and str(self.message_info.user_info.user_id) == str(global_config.bot.qq_account): - logger.info(f"检测到机器人自身发送的语音消息 (User ID: {self.message_info.user_info.user_id}),尝试从缓存获取文本。") - if isinstance(segment.data, str): - cached_text = consume_self_voice_text(segment.data) - if cached_text: - logger.info(f"成功从缓存中获取语音文本: '{cached_text[:70]}...'") - return f"[语音:{cached_text}]" - else: - logger.warning("机器人自身语音消息缓存未命中,将回退到标准语音识别。") - - # 标准语音识别流程 (也作为缓存未命中的后备方案) - if isinstance(segment.data, str): - return await get_voice_text(segment.data) - return "[发了一段语音,网卡了加载不出来]" - elif segment.type == "mention_bot": - self.is_picid = False - self.is_emoji = False - self.is_voice = False - self.is_video = False - self.is_mentioned = float(segment.data) # type: ignore - return "" - elif segment.type == "priority_info": - self.is_picid = False - self.is_emoji = False - self.is_voice = False - if isinstance(segment.data, dict): - # 处理优先级信息 - self.priority_mode = "priority" - self.priority_info = segment.data - """ - { - 'message_type': 'vip', # vip or normal - 'message_priority': 1.0, # 优先级,大为优先,float - } - """ - return "" - elif segment.type == "file": - if isinstance(segment.data, dict): - file_name = segment.data.get('name', '未知文件') - file_size = segment.data.get('size', '未知大小') - return f"[文件:{file_name} ({file_size}字节)]" - return "[收到一个文件]" - elif segment.type == "video": - self.is_picid = False - self.is_emoji = False - self.is_voice = False - self.is_video = True - logger.info(f"接收到视频消息,数据类型: {type(segment.data)}") - - # 检查视频分析功能是否可用 - if not is_video_analysis_available(): - logger.warning("⚠️ Rust视频处理模块不可用,跳过视频分析") - return "[视频]" - - if global_config.video_analysis.enable: - logger.info("已启用视频识别,开始识别") - if isinstance(segment.data, dict): - try: - # 从Adapter接收的视频数据 - video_base64 = segment.data.get("base64") - filename = segment.data.get("filename", "video.mp4") - - logger.info(f"视频文件名: {filename}") - logger.info(f"Base64数据长度: {len(video_base64) if video_base64 else 0}") - - if video_base64: - # 解码base64视频数据 - video_bytes = base64.b64decode(video_base64) - logger.info(f"解码后视频大小: {len(video_bytes)} 字节") - - # 使用video analyzer分析视频 - video_analyzer = get_video_analyzer() - result = await video_analyzer.analyze_video_from_bytes( - video_bytes, filename, prompt=global_config.video_analysis.batch_analysis_prompt - ) - - logger.info(f"视频分析结果: {result}") - - # 返回视频分析结果 - summary = result.get("summary", "") - if summary: - return f"[视频内容] {summary}" - else: - return "[已收到视频,但分析失败]" - else: - logger.warning("视频消息中没有base64数据") - return "[收到视频消息,但数据异常]" - except Exception as e: - logger.error(f"视频处理失败: {e!s}") - import traceback - - logger.error(f"错误详情: {traceback.format_exc()}") - return "[收到视频,但处理时出现错误]" - else: - logger.warning(f"视频消息数据不是字典格式: {type(segment.data)}") - return "[发了一个视频,但格式不支持]" - else: diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 5a078a22a..87fd9d4c1 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -948,40 +948,24 @@ class DefaultReplyer: chat_stream = await chat_manager.get_stream(chat_id) if chat_stream: stream_context = chat_stream.context_manager - # 使用真正的已读和未读消息 - read_messages = stream_context.context.history_messages # 已读消息 + + # 确保历史消息已从数据库加载 + await stream_context.ensure_history_initialized() + + # 直接使用内存中的已读和未读消息,无需再查询数据库 + read_messages = stream_context.context.history_messages # 已读消息(已从数据库加载) unread_messages = stream_context.get_unread_messages() # 未读消息 # 构建已读历史消息 prompt read_history_prompt = "" - # 总是从数据库加载历史记录,并与会话历史合并 - logger.info("正在从数据库加载上下文并与会话历史合并...") - db_messages_raw = await get_raw_msg_before_timestamp_with_chat( - chat_id=chat_id, - timestamp=time.time(), - limit=global_config.chat.max_context_size, - ) - - # 合并和去重 - combined_messages = {} - # 首先添加数据库消息 - for msg in db_messages_raw: - if msg.get("message_id"): - combined_messages[msg["message_id"]] = msg - - # 然后用会话消息覆盖/添加,以确保它们是最新的 - for msg_obj in read_messages: - msg_dict = msg_obj.flatten() - if msg_dict.get("message_id"): - combined_messages[msg_dict["message_id"]] = msg_dict - - # 按时间排序 - sorted_messages = sorted(combined_messages.values(), key=lambda x: x.get("time", 0)) - - read_history_prompt = "" - if sorted_messages: - # 限制最终用于prompt的历史消息数量 - final_history = sorted_messages[-50:] + if read_messages: + # 将 DatabaseMessages 对象转换为字典格式,以便使用 build_readable_messages + read_messages_dicts = [msg.flatten() for msg in read_messages] + + # 按时间排序并限制数量 + sorted_messages = sorted(read_messages_dicts, key=lambda x: x.get("time", 0)) + final_history = sorted_messages[-50:] # 限制最多50条 + read_content = await build_readable_messages( final_history, replace_bot_name=True, @@ -989,8 +973,10 @@ class DefaultReplyer: truncate=True, ) read_history_prompt = f"这是已读历史消息,仅作为当前聊天情景的参考:\n{read_content}" + logger.debug(f"使用内存中的 {len(final_history)} 条历史消息构建prompt") else: read_history_prompt = "暂无已读历史消息" + logger.debug("内存中没有历史消息") # 构建未读历史消息 prompt unread_history_prompt = "" @@ -1281,17 +1267,41 @@ class DefaultReplyer: action_descriptions += f"- {action_name}: {action_description}\n" action_descriptions += "\n" - message_list_before_now_long = await get_raw_msg_before_timestamp_with_chat( - chat_id=chat_id, - timestamp=time.time(), - limit=global_config.chat.max_context_size * 1, - ) - - message_list_before_short = await get_raw_msg_before_timestamp_with_chat( - chat_id=chat_id, - timestamp=time.time(), - limit=int(global_config.chat.max_context_size * 0.33), - ) + # 从内存获取历史消息,避免重复查询数据库 + from src.plugin_system.apis.chat_api import get_chat_manager + + chat_manager = get_chat_manager() + chat_stream_obj = await chat_manager.get_stream(chat_id) + + if chat_stream_obj: + # 确保历史消息已初始化 + await chat_stream_obj.context_manager.ensure_history_initialized() + + # 获取所有消息(历史+未读) + all_messages = ( + chat_stream_obj.context_manager.context.history_messages + + chat_stream_obj.context_manager.get_unread_messages() + ) + + # 转换为字典格式 + message_list_before_now_long = [msg.flatten() for msg in all_messages[-(global_config.chat.max_context_size * 2):]] + message_list_before_short = [msg.flatten() for msg in all_messages[-int(global_config.chat.max_context_size * 0.33):]] + + logger.debug(f"使用内存中的消息: long={len(message_list_before_now_long)}, short={len(message_list_before_short)}") + else: + # 回退到数据库查询 + logger.warning(f"无法获取chat_stream,回退到数据库查询: {chat_id}") + message_list_before_now_long = await get_raw_msg_before_timestamp_with_chat( + chat_id=chat_id, + timestamp=time.time(), + limit=global_config.chat.max_context_size * 2, + ) + message_list_before_short = await get_raw_msg_before_timestamp_with_chat( + chat_id=chat_id, + timestamp=time.time(), + limit=int(global_config.chat.max_context_size * 0.33), + ) + chat_talking_prompt_short = await build_readable_messages( message_list_before_short, replace_bot_name=True, @@ -1653,11 +1663,36 @@ class DefaultReplyer: else: mood_prompt = "" - message_list_before_now_half = await get_raw_msg_before_timestamp_with_chat( - chat_id=chat_id, - timestamp=time.time(), - limit=min(int(global_config.chat.max_context_size * 0.33), 15), - ) + # 从内存获取历史消息,避免重复查询数据库 + from src.plugin_system.apis.chat_api import get_chat_manager + + chat_manager = get_chat_manager() + chat_stream_obj = await chat_manager.get_stream(chat_id) + + if chat_stream_obj: + # 确保历史消息已初始化 + await chat_stream_obj.context_manager.ensure_history_initialized() + + # 获取所有消息(历史+未读) + all_messages = ( + chat_stream_obj.context_manager.context.history_messages + + chat_stream_obj.context_manager.get_unread_messages() + ) + + # 转换为字典格式,限制数量 + limit = min(int(global_config.chat.max_context_size * 0.33), 15) + message_list_before_now_half = [msg.flatten() for msg in all_messages[-limit:]] + + logger.debug(f"Rewrite使用内存中的 {len(message_list_before_now_half)} 条消息") + else: + # 回退到数据库查询 + logger.warning(f"无法获取chat_stream,回退到数据库查询: {chat_id}") + message_list_before_now_half = await get_raw_msg_before_timestamp_with_chat( + chat_id=chat_id, + timestamp=time.time(), + limit=min(int(global_config.chat.max_context_size * 0.33), 15), + ) + chat_talking_prompt_half = await build_readable_messages( message_list_before_now_half, replace_bot_name=True, @@ -2071,12 +2106,35 @@ class DefaultReplyer: memory_context = {key: value for key, value in memory_context.items() if value} - # 构建聊天历史用于存储 - message_list_before_short = await get_raw_msg_before_timestamp_with_chat( - chat_id=stream.stream_id, - timestamp=time.time(), - limit=int(global_config.chat.max_context_size * 0.33), - ) + # 从内存获取聊天历史用于存储,避免重复查询数据库 + from src.plugin_system.apis.chat_api import get_chat_manager + + chat_manager = get_chat_manager() + chat_stream_obj = await chat_manager.get_stream(stream.stream_id) + + if chat_stream_obj: + # 确保历史消息已初始化 + await chat_stream_obj.context_manager.ensure_history_initialized() + + # 获取所有消息(历史+未读) + all_messages = ( + chat_stream_obj.context_manager.context.history_messages + + chat_stream_obj.context_manager.get_unread_messages() + ) + + # 转换为字典格式,限制数量 + limit = int(global_config.chat.max_context_size * 0.33) + message_list_before_short = [msg.flatten() for msg in all_messages[-limit:]] + + logger.debug(f"记忆存储使用内存中的 {len(message_list_before_short)} 条消息") + else: + # 回退到数据库查询 + logger.warning(f"记忆存储:无法获取chat_stream,回退到数据库查询: {stream.stream_id}") + message_list_before_short = await get_raw_msg_before_timestamp_with_chat( + chat_id=stream.stream_id, + timestamp=time.time(), + limit=int(global_config.chat.max_context_size * 0.33), + ) chat_history = await build_readable_messages( message_list_before_short, replace_bot_name=True,