diff --git a/src/chat/message_manager/__init__.py b/src/chat/message_manager/__init__.py index 368a811a5..3e27858a0 100644 --- a/src/chat/message_manager/__init__.py +++ b/src/chat/message_manager/__init__.py @@ -1,25 +1,19 @@ """ 消息管理器模块 -提供统一的消息管理、上下文管理和分发调度功能 +提供统一的消息管理、上下文管理和流循环调度功能 """ from .message_manager import MessageManager, message_manager from .context_manager import SingleStreamContextManager from .distribution_manager import ( - DistributionManager, - DistributionPriority, - DistributionTask, - StreamDistributionState, - distribution_manager + StreamLoopManager, + stream_loop_manager ) __all__ = [ "MessageManager", "message_manager", "SingleStreamContextManager", - "DistributionManager", - "DistributionPriority", - "DistributionTask", - "StreamDistributionState", - "distribution_manager" + "StreamLoopManager", + "stream_loop_manager" ] \ No newline at end of file diff --git a/src/chat/message_manager/context_manager.py b/src/chat/message_manager/context_manager.py index a495e04e5..8b90cd69b 100644 --- a/src/chat/message_manager/context_manager.py +++ b/src/chat/message_manager/context_manager.py @@ -13,7 +13,7 @@ from src.common.logger import get_logger from src.config.config import global_config from src.common.data_models.database_data_model import DatabaseMessages from src.chat.energy_system import energy_manager -from .distribution_manager import distribution_manager +from .distribution_manager import stream_loop_manager logger = get_logger("context_manager") @@ -60,8 +60,9 @@ class SingleStreamContextManager: self.last_access_time = time.time() if not skip_energy_update: await self._update_stream_energy() - distribution_manager.add_stream_message(self.stream_id, 1) - logger.debug(f"添加消息到单流上下文: {self.stream_id} (兴趣度: {interest_value:.3f})") + # 启动流的循环任务(如果还未启动) + await stream_loop_manager.start_stream_loop(self.stream_id) + logger.info(f"添加消息到单流上下文: {self.stream_id} (兴趣度: {interest_value:.3f})") return True except Exception as e: logger.error(f"添加消息到单流上下文失败 {self.stream_id}: {e}", exc_info=True) @@ -293,7 +294,8 @@ class SingleStreamContextManager: if not skip_energy_update: await self._update_stream_energy() - distribution_manager.add_stream_message(self.stream_id, 1) + # 启动流的循环任务(如果还未启动) + await stream_loop_manager.start_stream_loop(self.stream_id) logger.debug(f"添加消息到单流上下文(异步): {self.stream_id} (兴趣度: {interest_value:.3f})") return True @@ -356,8 +358,8 @@ class SingleStreamContextManager: stream_id=self.stream_id, messages=combined_messages, user_id=user_id ) - # 更新分发管理器 - distribution_manager.update_stream_energy(self.stream_id, energy) + # 更新流循环管理器 + # 注意:能量更新会通过energy_manager自动同步到流循环管理器 except Exception as e: logger.error(f"更新单流能量失败 {self.stream_id}: {e}") diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index ab3579589..6a2aa794a 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -1,747 +1,293 @@ """ -重构后的动态消息分发管理器 -提供高效、智能的消息分发调度功能 +流循环管理器 +为每个聊天流创建独立的无限循环任务,主动轮询处理消息 """ import asyncio import time -from typing import Dict, List, Optional, Set, Any, Callable -from dataclasses import dataclass, field -from enum import Enum -from heapq import heappush, heappop -from abc import ABC, abstractmethod +from typing import Dict, Optional, Any from src.common.logger import get_logger from src.config.config import global_config from src.chat.energy_system import energy_manager +from src.chat.chatter_manager import ChatterManager +from src.plugin_system.apis.chat_api import get_chat_manager -logger = get_logger("distribution_manager") +logger = get_logger("stream_loop_manager") -class DistributionPriority(Enum): - """分发优先级""" - CRITICAL = 0 # 关键(立即处理) - HIGH = 1 # 高优先级 - NORMAL = 2 # 正常优先级 - LOW = 3 # 低优先级 - BACKGROUND = 4 # 后台优先级 +class StreamLoopManager: + """流循环管理器 - 每个流一个独立的无限循环任务""" - def __lt__(self, other: 'DistributionPriority') -> bool: - """用于优先级比较""" - return self.value < other.value - - -@dataclass -class DistributionTask: - """分发任务""" - stream_id: str - priority: DistributionPriority - energy: float - message_count: int - created_time: float = field(default_factory=time.time) - retry_count: int = 0 - max_retries: int = 3 - task_id: str = field(default_factory=lambda: f"task_{time.time()}_{id(object())}") - metadata: Dict[str, Any] = field(default_factory=dict) - - def __lt__(self, other: 'DistributionTask') -> bool: - """用于优先队列排序""" - # 首先按优先级排序 - if self.priority.value != other.priority.value: - return self.priority.value < other.priority.value - - # 相同优先级按能量排序(能量高的优先) - if abs(self.energy - other.energy) > 0.01: - return self.energy > other.energy - - # 最后按创建时间排序(先创建的优先) - return self.created_time < other.created_time - - def can_retry(self) -> bool: - """检查是否可以重试""" - return self.retry_count < self.max_retries - - def get_retry_delay(self, base_delay: float = 5.0) -> float: - """获取重试延迟""" - return base_delay * (2 ** min(self.retry_count, 3)) - - -@dataclass -class StreamDistributionState: - """流分发状态""" - stream_id: str - energy: float - last_distribution_time: float - next_distribution_time: float - message_count: int - consecutive_failures: int = 0 - is_active: bool = True - total_distributions: int = 0 - total_failures: int = 0 - average_distribution_time: float = 0.0 - metadata: Dict[str, Any] = field(default_factory=dict) - - def should_distribute(self, current_time: float) -> bool: - """检查是否应该分发""" - return (self.is_active and - current_time >= self.next_distribution_time and - self.message_count > 0) - - def update_distribution_stats(self, distribution_time: float, success: bool) -> None: - """更新分发统计""" - if success: - self.total_distributions += 1 - self.consecutive_failures = 0 - else: - self.total_failures += 1 - self.consecutive_failures += 1 - - # 更新平均分发时间 - total_attempts = self.total_distributions + self.total_failures - if total_attempts > 0: - self.average_distribution_time = ( - (self.average_distribution_time * (total_attempts - 1) + distribution_time) - / total_attempts - ) - - -class DistributionExecutor(ABC): - """分发执行器抽象基类""" - - @abstractmethod - async def execute(self, stream_id: str, context: Dict[str, Any]) -> bool: - """执行分发 - - Args: - stream_id: 流ID - context: 分发上下文 - - Returns: - bool: 是否执行成功 - """ - pass - - @abstractmethod - def get_priority(self, stream_id: str) -> DistributionPriority: - """获取流优先级 - - Args: - stream_id: 流ID - - Returns: - DistributionPriority: 优先级 - """ - pass - - -class DistributionManager: - """分发管理器 - 统一管理消息分发调度""" - - def __init__(self, max_concurrent_tasks: Optional[int] = None, retry_delay: Optional[float] = None): - # 流状态管理 - self.stream_states: Dict[str, StreamDistributionState] = {} - - # 任务队列 - self.task_queue: List[DistributionTask] = [] - self.processing_tasks: Set[str] = set() # 正在处理的stream_id - self.completed_tasks: List[DistributionTask] = [] - self.failed_tasks: List[DistributionTask] = [] + def __init__(self, max_concurrent_streams: Optional[int] = None): + # 流循环任务管理 + self.stream_loops: Dict[str, asyncio.Task] = {} + self.loop_lock = asyncio.Lock() # 统计信息 self.stats: Dict[str, Any] = { - "total_distributed": 0, - "total_failed": 0, - "avg_distribution_time": 0.0, - "current_queue_size": 0, - "total_created_tasks": 0, - "total_completed_tasks": 0, - "total_failed_tasks": 0, - "total_retry_attempts": 0, - "peak_queue_size": 0, + "active_streams": 0, + "total_loops": 0, + "total_process_cycles": 0, + "total_failures": 0, "start_time": time.time(), - "last_activity_time": time.time(), } # 配置参数 - self.max_concurrent_tasks = ( - max_concurrent_tasks or - getattr(global_config.chat, "max_concurrent_distributions", 3) + self.max_concurrent_streams = max_concurrent_streams or getattr( + global_config.chat, "max_concurrent_distributions", 10 ) - self.retry_delay = ( - retry_delay or - getattr(global_config.chat, "distribution_retry_delay", 5.0) - ) - self.max_queue_size = getattr(global_config.chat, "max_distribution_queue_size", 1000) - self.max_history_size = getattr(global_config.chat, "max_task_history_size", 100) - # 分发执行器 - self.executor: Optional[DistributionExecutor] = None - self.executor_callbacks: Dict[str, Callable] = {} + # Chatter管理器 + self.chatter_manager: Optional[ChatterManager] = None - # 事件循环 + # 状态控制 self.is_running = False - self.distribution_task: Optional[asyncio.Task] = None - self.cleanup_task: Optional[asyncio.Task] = None - # 性能监控 - self.performance_metrics: Dict[str, List[float]] = { - "distribution_times": [], - "queue_sizes": [], - "processing_counts": [], - } - self.max_metrics_size = 1000 + logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})") - logger.info(f"分发管理器初始化完成 (并发: {self.max_concurrent_tasks}, 重试延迟: {self.retry_delay}s)") - - async def start(self, cleanup_interval: float = 3600.0) -> None: - """启动分发管理器 - - Args: - cleanup_interval: 清理间隔(秒) - """ + async def start(self) -> None: + """启动流循环管理器""" if self.is_running: - logger.warning("分发管理器已经在运行") + logger.warning("流循环管理器已经在运行") return self.is_running = True - self.distribution_task = asyncio.create_task(self._distribution_loop()) - self.cleanup_task = asyncio.create_task(self._cleanup_loop(cleanup_interval)) - - logger.info("分发管理器已启动") + logger.info("流循环管理器已启动") async def stop(self) -> None: - """停止分发管理器""" + """停止流循环管理器""" if not self.is_running: return self.is_running = False - # 取消分发任务 - if self.distribution_task and not self.distribution_task.done(): - self.distribution_task.cancel() - try: - await self.distribution_task - except asyncio.CancelledError: - pass + # 取消所有流循环 + async with self.loop_lock: + for task in list(self.stream_loops.values()): + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + self.stream_loops.clear() - # 取消清理任务 - if self.cleanup_task and not self.cleanup_task.done(): - self.cleanup_task.cancel() - try: - await self.cleanup_task - except asyncio.CancelledError: - pass + logger.info("流循环管理器已停止") - # 取消所有处理中的任务 - for stream_id in list(self.processing_tasks): - self._cancel_stream_processing(stream_id) - - logger.info("分发管理器已停止") - - def add_stream_message(self, stream_id: str, message_count: int = 1, - priority: Optional[DistributionPriority] = None) -> bool: - """添加流消息 + async def start_stream_loop(self, stream_id: str) -> bool: + """启动指定流的循环任务 Args: stream_id: 流ID - message_count: 消息数量 - priority: 指定优先级(可选) Returns: - bool: 是否成功添加 + bool: 是否成功启动 """ - current_time = time.time() - self.stats["last_activity_time"] = current_time + async with self.loop_lock: + # 检查是否已有循环在运行 + if stream_id in self.stream_loops: + logger.debug(f"流 {stream_id} 循环已在运行") + return True - # 检查队列大小限制 - if len(self.task_queue) >= self.max_queue_size: - logger.warning(f"分发队列已满,拒绝添加: {stream_id}") - return False + # 检查是否超过最大并发限制 + if len(self.stream_loops) >= self.max_concurrent_streams: + logger.warning(f"超过最大并发流数限制,无法启动流 {stream_id}") + return False - # 获取或创建流状态 - if stream_id not in self.stream_states: - self.stream_states[stream_id] = StreamDistributionState( - stream_id=stream_id, - energy=0.5, # 默认能量 - last_distribution_time=current_time, - next_distribution_time=current_time, - message_count=0, - ) + # 创建流循环任务 + task = asyncio.create_task(self._stream_loop(stream_id)) + self.stream_loops[stream_id] = task + self.stats["total_loops"] += 1 - # 更新流状态 - state = self.stream_states[stream_id] - state.message_count += message_count + logger.info(f"启动流循环: {stream_id}") + return True - # 计算优先级 - if priority is None: - priority = self._calculate_priority(state) - - # 创建分发任务 - task = DistributionTask( - stream_id=stream_id, - priority=priority, - energy=state.energy, - message_count=state.message_count, - ) - - # 添加到任务队列 - heappush(self.task_queue, task) - self.stats["current_queue_size"] = len(self.task_queue) - self.stats["peak_queue_size"] = max(self.stats["peak_queue_size"], len(self.task_queue)) - self.stats["total_created_tasks"] += 1 - - # 记录性能指标 - self._record_performance_metric("queue_sizes", len(self.task_queue)) - - logger.debug(f"添加分发任务: {stream_id} (优先级: {priority.name}, 消息数: {message_count})") - return True - - def update_stream_energy(self, stream_id: str, energy: float) -> None: - """更新流能量 + async def stop_stream_loop(self, stream_id: str) -> bool: + """停止指定流的循环任务 Args: stream_id: 流ID - energy: 新的能量值 - """ - if stream_id in self.stream_states: - self.stream_states[stream_id].energy = max(0.0, min(1.0, energy)) - - # 失效能量管理器缓存 - energy_manager.invalidate_cache(stream_id) - - logger.debug(f"更新流能量: {stream_id} = {energy:.3f}") - - def _calculate_priority(self, state: StreamDistributionState) -> DistributionPriority: - """计算分发优先级 - - Args: - state: 流状态 Returns: - DistributionPriority: 优先级 + bool: 是否成功停止 """ - energy = state.energy - message_count = state.message_count - consecutive_failures = state.consecutive_failures - total_distributions = state.total_distributions - - # 使用执行器获取优先级(如果设置) - if self.executor: - try: - return self.executor.get_priority(state.stream_id) - except Exception as e: - logger.warning(f"获取执行器优先级失败: {e}") - - # 失败次数过多,降低优先级 - if consecutive_failures >= 3: - return DistributionPriority.BACKGROUND - - # 高分发次数降低优先级 - if total_distributions > 50 and message_count < 2: - return DistributionPriority.LOW - - # 基于能量和消息数计算优先级 - if energy >= 0.8 and message_count >= 3: - return DistributionPriority.CRITICAL - elif energy >= 0.6 or message_count >= 5: - return DistributionPriority.HIGH - elif energy >= 0.3 or message_count >= 2: - return DistributionPriority.NORMAL - else: - return DistributionPriority.LOW - - async def _distribution_loop(self): - """分发主循环""" - while self.is_running: - try: - # 处理任务队列 - await self._process_task_queue() - - # 更新统计信息 - self._update_statistics() - - # 记录性能指标 - self._record_performance_metric("processing_counts", len(self.processing_tasks)) - - # 动态调整循环间隔 - queue_size = len(self.task_queue) - processing_count = len(self.processing_tasks) - sleep_time = 0.05 if queue_size > 10 or processing_count > 0 else 0.2 - - # 短暂休眠 - await asyncio.sleep(sleep_time) - - except asyncio.CancelledError: - break - except Exception as e: - logger.error(f"分发循环出错: {e}", exc_info=True) - await asyncio.sleep(1.0) - - async def _process_task_queue(self): - """处理任务队列""" - current_time = time.time() - - # 检查是否有可用的处理槽位 - available_slots = self.max_concurrent_tasks - len(self.processing_tasks) - if available_slots <= 0: - return - - # 处理队列中的任务 - processed_count = 0 - while (self.task_queue and - processed_count < available_slots and - len(self.processing_tasks) < self.max_concurrent_tasks): - - task = heappop(self.task_queue) - self.stats["current_queue_size"] = len(self.task_queue) - - # 检查任务是否仍然有效 - if not self._is_task_valid(task, current_time): - self._handle_invalid_task(task) - continue - - # 开始处理任务 - await self._start_task_processing(task) - processed_count += 1 - - # 记录处理统计 - if processed_count > 0: - logger.debug(f"处理了 {processed_count} 个分发任务") - - def _is_task_valid(self, task: DistributionTask, current_time: float) -> bool: - """检查任务是否有效 - - Args: - task: 分发任务 - current_time: 当前时间 - - Returns: - bool: 任务是否有效 - """ - state = self.stream_states.get(task.stream_id) - if not state or not state.is_active: + async with self.loop_lock: + if stream_id in self.stream_loops: + task = self.stream_loops[stream_id] + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + del self.stream_loops[stream_id] + logger.info(f"停止流循环: {stream_id}") + return True return False - # 检查任务是否已过期 - if current_time - task.created_time > 3600: # 1小时 - return False - - # 检查是否达到了分发时间 - return state.should_distribute(current_time) - - def _handle_invalid_task(self, task: DistributionTask) -> None: - """处理无效任务 + async def _stream_loop(self, stream_id: str) -> None: + """单个流的无限循环 Args: - task: 无效的任务 + stream_id: 流ID """ - logger.debug(f"任务无效,丢弃: {task.stream_id} (创建时间: {task.created_time})") - # 可以添加到历史记录中用于分析 - if len(self.failed_tasks) < self.max_history_size: - self.failed_tasks.append(task) - - async def _start_task_processing(self, task: DistributionTask) -> None: - """开始处理任务 - - Args: - task: 分发任务 - """ - stream_id = task.stream_id - state = self.stream_states[stream_id] - current_time = time.time() - - # 标记为处理中 - self.processing_tasks.add(stream_id) - state.last_distribution_time = current_time - - # 计算下次分发时间 - interval = energy_manager.get_distribution_interval(state.energy) - state.next_distribution_time = current_time + interval - - # 记录开始处理 - logger.info(f"开始处理分发任务: {stream_id} " - f"(能量: {state.energy:.3f}, " - f"消息数: {state.message_count}, " - f"周期: {interval:.1f}s, " - f"重试次数: {task.retry_count})") - - # 创建处理任务 - asyncio.create_task(self._process_distribution_task(task)) - - async def _process_distribution_task(self, task: DistributionTask) -> None: - """处理分发任务 - - Args: - task: 分发任务 - """ - stream_id = task.stream_id - start_time = time.time() + logger.info(f"流循环开始: {stream_id}") try: - # 调用外部处理函数 - success = await self._execute_distribution(stream_id) + while self.is_running: + try: + # 1. 获取流上下文 + context = await self._get_stream_context(stream_id) + if not context: + logger.warning(f"无法获取流上下文: {stream_id}") + await asyncio.sleep(10.0) + continue - if success: - # 处理成功 - self._handle_task_success(task, start_time) - else: - # 处理失败 - await self._handle_task_failure(task) + # 2. 检查是否有消息需要处理 + has_messages = await self._has_messages_to_process(context) - except Exception as e: - logger.error(f"处理分发任务失败 {stream_id}: {e}", exc_info=True) - await self._handle_task_failure(task) + if has_messages: + # 3. 激活chatter处理 + success = await self._process_stream_messages(stream_id, context) + + # 更新统计 + self.stats["total_process_cycles"] += 1 + if success: + logger.debug(f"流处理成功: {stream_id}") + else: + self.stats["total_failures"] += 1 + logger.warning(f"流处理失败: {stream_id}") + + # 4. 计算下次检查间隔 + interval = await self._calculate_interval(stream_id, has_messages) + + # 5. sleep等待下次检查 + logger.debug(f"流 {stream_id} 等待 {interval:.2f}s") + await asyncio.sleep(interval) + + except asyncio.CancelledError: + logger.info(f"流循环被取消: {stream_id}") + break + except Exception as e: + logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True) + self.stats["total_failures"] += 1 + await asyncio.sleep(5.0) # 错误时等待5秒再重试 finally: - # 清理处理状态 - self.processing_tasks.discard(stream_id) - self.stats["last_activity_time"] = time.time() + # 清理循环标记 + async with self.loop_lock: + if stream_id in self.stream_loops: + del self.stream_loops[stream_id] - async def _execute_distribution(self, stream_id: str) -> bool: - """执行分发(需要外部实现) + logger.info(f"流循环结束: {stream_id}") + + async def _get_stream_context(self, stream_id: str) -> Optional[Any]: + """获取流上下文 Args: stream_id: 流ID Returns: - bool: 是否执行成功 + Optional[Any]: 流上下文,如果不存在返回None """ - # 使用执行器处理分发 - if self.executor: - try: - state = self.stream_states.get(stream_id) - context = { - "stream_id": stream_id, - "energy": state.energy if state else 0.5, - "message_count": state.message_count if state else 0, - "task_metadata": {}, - } - return await self.executor.execute(stream_id, context) - except Exception as e: - logger.error(f"执行器分发失败 {stream_id}: {e}") - return False - - # 回退到回调函数 - callback = self.executor_callbacks.get(stream_id) - if callback: - try: - result = callback(stream_id) - if asyncio.iscoroutine(result): - return await result - return bool(result) - except Exception as e: - logger.error(f"回调分发失败 {stream_id}: {e}") - return False - - # 默认处理 - logger.debug(f"执行分发: {stream_id}") - return True - - def _handle_task_success(self, task: DistributionTask, start_time: float) -> None: - """处理任务成功 - - Args: - task: 成功的任务 - start_time: 开始时间 - """ - stream_id = task.stream_id - state = self.stream_states.get(stream_id) - distribution_time = time.time() - start_time - - if state: - # 更新流状态 - state.update_distribution_stats(distribution_time, True) - state.message_count = 0 # 清空消息计数 - - # 更新全局统计 - self.stats["total_distributed"] += 1 - self.stats["total_completed_tasks"] += 1 - - # 更新平均分发时间 - if self.stats["total_distributed"] > 0: - self.stats["avg_distribution_time"] = ( - (self.stats["avg_distribution_time"] * (self.stats["total_distributed"] - 1) + distribution_time) - / self.stats["total_distributed"] - ) - - # 记录性能指标 - self._record_performance_metric("distribution_times", distribution_time) - - # 添加到成功任务历史 - if len(self.completed_tasks) < self.max_history_size: - self.completed_tasks.append(task) - - logger.info(f"分发任务成功: {stream_id} (耗时: {distribution_time:.2f}s, 重试: {task.retry_count})") - - async def _handle_task_failure(self, task: DistributionTask) -> None: - """处理任务失败 - - Args: - task: 失败的任务 - """ - stream_id = task.stream_id - state = self.stream_states.get(stream_id) - distribution_time = time.time() - task.created_time - - if state: - # 更新流状态 - state.update_distribution_stats(distribution_time, False) - - # 增加失败计数 - state.consecutive_failures += 1 - - # 计算重试延迟 - retry_delay = task.get_retry_delay(self.retry_delay) - task.retry_count += 1 - self.stats["total_retry_attempts"] += 1 - - # 如果还有重试机会,重新添加到队列 - if task.can_retry(): - # 等待重试延迟 - await asyncio.sleep(retry_delay) - - # 重新计算优先级(失败后降低优先级) - task.priority = DistributionPriority.LOW - - # 重新添加到队列 - heappush(self.task_queue, task) - self.stats["current_queue_size"] = len(self.task_queue) - - logger.warning(f"分发任务失败,准备重试: {stream_id} " - f"(重试次数: {task.retry_count}/{task.max_retries}, " - f"延迟: {retry_delay:.1f}s)") - else: - # 超过重试次数,标记为不活跃 - state.is_active = False - self.stats["total_failed"] += 1 - self.stats["total_failed_tasks"] += 1 - - # 添加到失败任务历史 - if len(self.failed_tasks) < self.max_history_size: - self.failed_tasks.append(task) - - logger.error(f"分发任务最终失败: {stream_id} (重试次数: {task.retry_count})") - - def _cancel_stream_processing(self, stream_id: str) -> None: - """取消流处理 - - Args: - stream_id: 流ID - """ - # 从处理集合中移除 - self.processing_tasks.discard(stream_id) - - # 更新流状态 - if stream_id in self.stream_states: - self.stream_states[stream_id].is_active = False - - logger.info(f"取消流处理: {stream_id}") - - def _update_statistics(self) -> None: - """更新统计信息""" - # 更新当前队列大小 - self.stats["current_queue_size"] = len(self.task_queue) - - # 更新运行时间 - if self.is_running: - self.stats["uptime"] = time.time() - self.stats["start_time"] - - # 更新性能统计 - self.stats["avg_queue_size"] = ( - sum(self.performance_metrics["queue_sizes"]) / - max(1, len(self.performance_metrics["queue_sizes"])) - ) - - self.stats["avg_processing_count"] = ( - sum(self.performance_metrics["processing_counts"]) / - max(1, len(self.performance_metrics["processing_counts"])) - ) - - def _record_performance_metric(self, metric_name: str, value: float) -> None: - """记录性能指标 - - Args: - metric_name: 指标名称 - value: 指标值 - """ - if metric_name in self.performance_metrics: - metrics = self.performance_metrics[metric_name] - metrics.append(value) - # 保持大小限制 - if len(metrics) > self.max_metrics_size: - metrics.pop(0) - - async def _cleanup_loop(self, interval: float) -> None: - """清理循环 - - Args: - interval: 清理间隔 - """ - while self.is_running: - try: - await asyncio.sleep(interval) - self._cleanup_expired_data() - logger.debug(f"清理完成,保留 {len(self.completed_tasks)} 个成功任务,{len(self.failed_tasks)} 个失败任务") - except asyncio.CancelledError: - break - except Exception as e: - logger.error(f"清理循环出错: {e}") - - def _cleanup_expired_data(self) -> None: - """清理过期数据""" - current_time = time.time() - max_age = 24 * 3600 # 24小时 - - # 清理过期的成功任务 - self.completed_tasks = [ - task for task in self.completed_tasks - if current_time - task.created_time < max_age - ] - - # 清理过期的失败任务 - self.failed_tasks = [ - task for task in self.failed_tasks - if current_time - task.created_time < max_age - ] - - # 清理性能指标 - for metric_name in self.performance_metrics: - if len(self.performance_metrics[metric_name]) > self.max_metrics_size: - self.performance_metrics[metric_name] = ( - self.performance_metrics[metric_name][-self.max_metrics_size:] - ) - - def get_stream_status(self, stream_id: str) -> Optional[Dict[str, Any]]: - """获取流状态 - - Args: - stream_id: 流ID - - Returns: - Optional[Dict[str, Any]]: 流状态信息 - """ - if stream_id not in self.stream_states: + try: + chat_manager = get_chat_manager() + chat_stream = chat_manager.get_stream(stream_id) + if chat_stream: + return chat_stream.context_manager.context + return None + except Exception as e: + logger.error(f"获取流上下文失败 {stream_id}: {e}") return None - state = self.stream_states[stream_id] - current_time = time.time() - time_until_next = max(0, state.next_distribution_time - current_time) + async def _has_messages_to_process(self, context: Any) -> bool: + """检查是否有消息需要处理 - return { - "stream_id": state.stream_id, - "energy": state.energy, - "message_count": state.message_count, - "last_distribution_time": state.last_distribution_time, - "next_distribution_time": state.next_distribution_time, - "time_until_next_distribution": time_until_next, - "consecutive_failures": state.consecutive_failures, - "total_distributions": state.total_distributions, - "total_failures": state.total_failures, - "average_distribution_time": state.average_distribution_time, - "is_active": state.is_active, - "is_processing": stream_id in self.processing_tasks, - "uptime": current_time - state.last_distribution_time, - } + Args: + context: 流上下文 + + Returns: + bool: 是否有未读消息 + """ + try: + # 检查是否有未读消息 + if hasattr(context, "unread_messages") and context.unread_messages: + return True + + # 检查其他需要处理的条件 + if hasattr(context, "has_pending_messages") and context.has_pending_messages: + return True + + return False + except Exception as e: + logger.error(f"检查消息状态失败: {e}") + return False + + async def _process_stream_messages(self, stream_id: str, context: Any) -> bool: + """处理流消息 + + Args: + stream_id: 流ID + context: 流上下文 + + Returns: + bool: 是否处理成功 + """ + if not self.chatter_manager: + logger.warning(f"Chatter管理器未设置: {stream_id}") + return False + + try: + start_time = time.time() + + # 直接调用chatter_manager处理流上下文 + results = await self.chatter_manager.process_stream_context(stream_id, context) + success = results.get("success", False) + + if success: + process_time = time.time() - start_time + logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)") + else: + logger.warning(f"流处理失败: {stream_id} - {results.get('error_message', '未知错误')}") + + return success + + except Exception as e: + logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True) + return False + + async def _calculate_interval(self, stream_id: str, has_messages: bool) -> float: + """计算下次检查间隔 + + Args: + stream_id: 流ID + has_messages: 本次是否有消息处理 + + Returns: + float: 间隔时间(秒) + """ + # 基础间隔 + base_interval = getattr(global_config.chat, "distribution_interval", 5.0) + + # 如果没有消息,使用更长的间隔 + if not has_messages: + return base_interval * 2.0 # 无消息时间隔加倍 + + # 尝试使用能量管理器计算间隔 + try: + # 获取当前focus_energy + focus_energy = energy_manager.energy_cache.get(stream_id, (0.5, 0))[0] + + # 使用能量管理器计算间隔 + interval = energy_manager.get_distribution_interval(focus_energy) + + logger.debug(f"流 {stream_id} 动态间隔: {interval:.2f}s (能量: {focus_energy:.3f})") + return interval + + except Exception as e: + logger.debug(f"流 {stream_id} 使用默认间隔: {base_interval:.2f}s ({e})") + return base_interval def get_queue_status(self) -> Dict[str, Any]: """获取队列状态 @@ -752,157 +298,25 @@ class DistributionManager: current_time = time.time() uptime = current_time - self.stats["start_time"] if self.is_running else 0 - # 分析任务优先级分布 - priority_counts = {} - for task in self.task_queue: - priority_name = task.priority.name - priority_counts[priority_name] = priority_counts.get(priority_name, 0) + 1 - return { - "queue_size": len(self.task_queue), - "processing_count": len(self.processing_tasks), - "max_concurrent": self.max_concurrent_tasks, - "max_queue_size": self.max_queue_size, + "active_streams": len(self.stream_loops), + "total_loops": self.stats["total_loops"], + "max_concurrent": self.max_concurrent_streams, "is_running": self.is_running, "uptime": uptime, - "priority_distribution": priority_counts, + "total_process_cycles": self.stats["total_process_cycles"], + "total_failures": self.stats["total_failures"], "stats": self.stats.copy(), - "performance_metrics": { - name: { - "count": len(metrics), - "avg": sum(metrics) / max(1, len(metrics)), - "min": min(metrics) if metrics else 0, - "max": max(metrics) if metrics else 0, - } - for name, metrics in self.performance_metrics.items() - }, } - def deactivate_stream(self, stream_id: str) -> bool: - """停用流 + def set_chatter_manager(self, chatter_manager: ChatterManager) -> None: + """设置chatter管理器 Args: - stream_id: 流ID - - Returns: - bool: 是否成功停用 + chatter_manager: chatter管理器实例 """ - if stream_id in self.stream_states: - self.stream_states[stream_id].is_active = False - # 取消正在处理的任务 - if stream_id in self.processing_tasks: - self._cancel_stream_processing(stream_id) - logger.info(f"停用流: {stream_id}") - return True - return False - - def activate_stream(self, stream_id: str) -> bool: - """激活流 - - Args: - stream_id: 流ID - - Returns: - bool: 是否成功激活 - """ - if stream_id in self.stream_states: - self.stream_states[stream_id].is_active = True - self.stream_states[stream_id].consecutive_failures = 0 - self.stream_states[stream_id].next_distribution_time = time.time() - logger.info(f"激活流: {stream_id}") - return True - return False - - def cleanup_inactive_streams(self, max_inactive_hours: int = 24) -> int: - """清理不活跃的流 - - Args: - max_inactive_hours: 最大不活跃小时数 - - Returns: - int: 清理的流数量 - """ - current_time = time.time() - max_inactive_seconds = max_inactive_hours * 3600 - - inactive_streams = [] - for stream_id, state in self.stream_states.items(): - if (not state.is_active and - current_time - state.last_distribution_time > max_inactive_seconds and - state.message_count == 0): - inactive_streams.append(stream_id) - - for stream_id in inactive_streams: - del self.stream_states[stream_id] - # 同时清理处理中的任务 - self.processing_tasks.discard(stream_id) - logger.debug(f"清理不活跃流: {stream_id}") - - if inactive_streams: - logger.info(f"清理了 {len(inactive_streams)} 个不活跃流") - - return len(inactive_streams) - - def set_executor(self, executor: DistributionExecutor) -> None: - """设置分发执行器 - - Args: - executor: 分发执行器实例 - """ - self.executor = executor - logger.info(f"设置分发执行器: {executor.__class__.__name__}") - - def register_callback(self, stream_id: str, callback: Callable) -> None: - """注册分发回调 - - Args: - stream_id: 流ID - callback: 回调函数 - """ - self.executor_callbacks[stream_id] = callback - logger.debug(f"注册分发回调: {stream_id}") - - def unregister_callback(self, stream_id: str) -> bool: - """注销分发回调 - - Args: - stream_id: 流ID - - Returns: - bool: 是否成功注销 - """ - if stream_id in self.executor_callbacks: - del self.executor_callbacks[stream_id] - logger.debug(f"注销分发回调: {stream_id}") - return True - return False - - def get_task_history(self, limit: int = 50) -> Dict[str, List[Dict[str, Any]]]: - """获取任务历史 - - Args: - limit: 返回数量限制 - - Returns: - Dict[str, List[Dict[str, Any]]]: 任务历史 - """ - def task_to_dict(task: DistributionTask) -> Dict[str, Any]: - return { - "task_id": task.task_id, - "stream_id": task.stream_id, - "priority": task.priority.name, - "energy": task.energy, - "message_count": task.message_count, - "created_time": task.created_time, - "retry_count": task.retry_count, - "max_retries": task.max_retries, - "metadata": task.metadata, - } - - return { - "completed_tasks": [task_to_dict(task) for task in self.completed_tasks[-limit:]], - "failed_tasks": [task_to_dict(task) for task in self.failed_tasks[-limit:]], - } + self.chatter_manager = chatter_manager + logger.info(f"设置chatter管理器: {chatter_manager.__class__.__name__}") def get_performance_summary(self) -> Dict[str, Any]: """获取性能摘要 @@ -913,92 +327,18 @@ class DistributionManager: current_time = time.time() uptime = current_time - self.stats["start_time"] - # 计算成功率 - total_attempts = self.stats["total_completed_tasks"] + self.stats["total_failed_tasks"] - success_rate = ( - self.stats["total_completed_tasks"] / max(1, total_attempts) - ) if total_attempts > 0 else 0.0 - # 计算吞吐量 - throughput = ( - self.stats["total_completed_tasks"] / max(1, uptime / 3600) - ) # 每小时完成任务数 + throughput = self.stats["total_process_cycles"] / max(1, uptime / 3600) # 每小时处理次数 return { "uptime_hours": uptime / 3600, - "success_rate": success_rate, + "active_streams": len(self.stream_loops), + "total_process_cycles": self.stats["total_process_cycles"], + "total_failures": self.stats["total_failures"], "throughput_per_hour": throughput, - "avg_distribution_time": self.stats["avg_distribution_time"], - "total_retry_attempts": self.stats["total_retry_attempts"], - "peak_queue_size": self.stats["peak_queue_size"], - "active_streams": len(self.stream_states), - "processing_tasks": len(self.processing_tasks), + "max_concurrent_streams": self.max_concurrent_streams, } - def reset_statistics(self) -> None: - """重置统计信息""" - self.stats.update({ - "total_distributed": 0, - "total_failed": 0, - "avg_distribution_time": 0.0, - "current_queue_size": len(self.task_queue), - "total_created_tasks": 0, - "total_completed_tasks": 0, - "total_failed_tasks": 0, - "total_retry_attempts": 0, - "peak_queue_size": 0, - "start_time": time.time(), - "last_activity_time": time.time(), - }) - # 清空性能指标 - for metrics in self.performance_metrics.values(): - metrics.clear() - - logger.info("分发管理器统计信息已重置") - - def get_all_stream_states(self) -> Dict[str, Dict[str, Any]]: - """获取所有流状态 - - Returns: - Dict[str, Dict[str, Any]]: 所有流状态 - """ - return { - stream_id: self.get_stream_status(stream_id) - for stream_id in self.stream_states.keys() - } - - def force_process_stream(self, stream_id: str) -> bool: - """强制处理指定流 - - Args: - stream_id: 流ID - - Returns: - bool: 是否成功触发处理 - """ - if stream_id not in self.stream_states: - return False - - state = self.stream_states[stream_id] - if not state.is_active: - return False - - # 创建高优先级任务 - task = DistributionTask( - stream_id=stream_id, - priority=DistributionPriority.CRITICAL, - energy=state.energy, - message_count=state.message_count, - ) - - # 添加到队列 - heappush(self.task_queue, task) - self.stats["current_queue_size"] = len(self.task_queue) - - logger.info(f"强制处理流: {stream_id}") - return True - - -# 全局分发管理器实例 -distribution_manager = DistributionManager() \ No newline at end of file +# 全局流循环管理器实例 +stream_loop_manager = StreamLoopManager() diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 9d977e823..90f785a17 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -6,7 +6,6 @@ import asyncio import random import time -import traceback from typing import Dict, Optional, Any, TYPE_CHECKING from src.common.logger import get_logger @@ -18,6 +17,7 @@ from .sleep_manager.sleep_manager import SleepManager from .sleep_manager.wakeup_manager import WakeUpManager from src.config.config import global_config from src.plugin_system.apis.chat_api import get_chat_manager +from .distribution_manager import stream_loop_manager if TYPE_CHECKING: from src.common.data_models.message_manager_data_model import StreamContext @@ -53,11 +53,16 @@ class MessageManager: return self.is_running = True - self.manager_task = asyncio.create_task(self._manager_loop()) - await self.wakeup_manager.start() - # await self.context_manager.start() # 已删除,需要重构 - logger.info("消息管理器已启动") + # 启动睡眠和唤醒管理器 + await self.wakeup_manager.start() + + # 启动流循环管理器并设置chatter_manager + await stream_loop_manager.start() + stream_loop_manager.set_chatter_manager(self.chatter_manager) + + logger.info("🚀 消息管理器已启动 | 流循环管理器已启动") + async def stop(self): """停止消息管理器""" if not self.is_running: @@ -65,15 +70,13 @@ class MessageManager: self.is_running = False - # 停止所有流处理任务 - # 注意:context_manager 会自己清理任务 - if self.manager_task and not self.manager_task.done(): - self.manager_task.cancel() - + # 停止睡眠和唤醒管理器 await self.wakeup_manager.stop() - # await self.context_manager.stop() # 已删除,需要重构 - logger.info("消息管理器已停止") + # 停止流循环管理器 + await stream_loop_manager.stop() + + logger.info("🛑 消息管理器已停止 | 流循环管理器已停止") async def add_message(self, stream_id: str, message: DatabaseMessages): """添加消息到指定聊天流""" @@ -140,152 +143,6 @@ class MessageManager: except Exception as e: logger.error(f"为消息 {message_id} 添加动作时发生错误: {e}") - async def _manager_loop(self): - """管理器主循环 - 独立聊天流分发周期版本""" - while self.is_running: - try: - # 更新睡眠状态 - await self.sleep_manager.update_sleep_state(self.wakeup_manager) - - # 执行独立分发周期的检查 - await self._check_streams_with_individual_intervals() - - # 计算下次检查时间(使用最小间隔或固定间隔) - if global_config.chat.dynamic_distribution_enabled: - next_check_delay = self._calculate_next_manager_delay() - else: - next_check_delay = self.check_interval - - await asyncio.sleep(next_check_delay) - except asyncio.CancelledError: - break - except Exception as e: - logger.error(f"消息管理器循环出错: {e}") - traceback.print_exc() - - async def _check_all_streams(self): - """检查所有聊天流""" - active_streams = 0 - total_unread = 0 - - # 通过 ChatManager 获取所有活跃的流 - try: - chat_manager = get_chat_manager() - active_stream_ids = list(chat_manager.streams.keys()) - - for stream_id in active_stream_ids: - chat_stream = chat_manager.get_stream(stream_id) - if not chat_stream: - continue - - # 检查流是否活跃 - context = chat_stream.stream_context - if not context.is_active: - continue - - active_streams += 1 - - # 检查是否有未读消息 - unread_messages = chat_stream.context_manager.get_unread_messages() - if unread_messages: - total_unread += len(unread_messages) - - # 如果没有处理任务,创建一个 - if not hasattr(context, 'processing_task') or not context.processing_task or context.processing_task.done(): - context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id)) - - # 更新统计 - self.stats.active_streams = active_streams - self.stats.total_unread_messages = total_unread - - except Exception as e: - logger.error(f"检查所有聊天流时发生错误: {e}") - - async def _process_stream_messages(self, stream_id: str): - """处理指定聊天流的消息""" - try: - # 通过 ChatManager 获取 ChatStream - chat_manager = get_chat_manager() - chat_stream = chat_manager.get_stream(stream_id) - if not chat_stream: - logger.warning(f"处理消息失败: 聊天流 {stream_id} 不存在") - return - - context = chat_stream.stream_context - - # 获取未读消息 - unread_messages = chat_stream.context_manager.get_unread_messages() - if not unread_messages: - return - - # 检查是否需要打断现有处理 - await self._check_and_handle_interruption(context, stream_id) - - # --- 睡眠状态检查 --- - if self.sleep_manager.is_sleeping(): - logger.info(f"Bot正在睡觉,检查聊天流 {stream_id} 是否有唤醒触发器。") - - was_woken_up = False - is_private = context.is_private_chat() - - for message in unread_messages: - is_mentioned = message.is_mentioned or False - if not is_mentioned and not is_private: - bot_names = [global_config.bot.nickname] + global_config.bot.alias_names - if any(name in message.processed_plain_text for name in bot_names): - is_mentioned = True - logger.debug(f"通过关键词 '{next((name for name in bot_names if name in message.processed_plain_text), '')}' 匹配将消息标记为 'is_mentioned'") - - if is_private or is_mentioned: - if self.wakeup_manager.add_wakeup_value(is_private, is_mentioned, chat_id=stream_id): - was_woken_up = True - break # 一旦被吵醒,就跳出循环并处理消息 - - if not was_woken_up: - logger.debug(f"聊天流 {stream_id} 中没有唤醒触发器,保持消息未读状态。") - return # 退出,不处理消息 - - logger.info(f"Bot被聊天流 {stream_id} 中的消息吵醒,继续处理。") - elif self.sleep_manager.is_woken_up(): - angry_chat_id = self.wakeup_manager.angry_chat_id - if stream_id != angry_chat_id: - logger.debug(f"Bot处于WOKEN_UP状态,但当前流 {stream_id} 不是触发唤醒的流 {angry_chat_id},跳过处理。") - return # 退出,不处理此流的消息 - logger.info(f"Bot处于WOKEN_UP状态,处理触发唤醒的流 {stream_id}。") - # --- 睡眠状态检查结束 --- - - logger.debug(f"开始处理聊天流 {stream_id} 的 {len(unread_messages)} 条未读消息") - - # 直接使用StreamContext对象进行处理 - if unread_messages: - try: - # 记录当前chat type用于调试 - logger.debug(f"聊天流 {stream_id} 检测到的chat type: {context.chat_type.value}") - - # 发送到chatter manager,传递StreamContext对象 - results = await self.chatter_manager.process_stream_context(stream_id, context) - - # 处理结果,标记消息为已读 - if results.get("success", False): - self._clear_all_unread_messages(stream_id) - logger.debug(f"聊天流 {stream_id} 处理成功,清除了 {len(unread_messages)} 条未读消息") - else: - logger.warning(f"聊天流 {stream_id} 处理失败: {results.get('error_message', '未知错误')}") - - except Exception as e: - logger.error(f"处理聊天流 {stream_id} 时发生异常,将清除所有未读消息: {e}") - # 出现异常时也清除未读消息,避免重复处理 - self._clear_all_unread_messages(stream_id) - raise - - logger.debug(f"聊天流 {stream_id} 消息处理完成") - - except asyncio.CancelledError: - raise - except Exception as e: - logger.error(f"处理聊天流 {stream_id} 消息时出错: {e}") - traceback.print_exc() - def deactivate_stream(self, stream_id: str): """停用聊天流""" try: @@ -431,211 +288,6 @@ class MessageManager: else: logger.debug(f"聊天流 {stream_id} 未触发打断,打断概率: {interruption_probability:.2f}") - def _calculate_stream_distribution_interval(self, context: StreamContext) -> float: - """计算单个聊天流的分发周期 - 使用重构后的能量管理器""" - if not global_config.chat.dynamic_distribution_enabled: - return self.check_interval # 使用固定间隔 - - try: - from src.chat.energy_system import energy_manager - from src.plugin_system.apis.chat_api import get_chat_manager - - # 获取聊天流和能量 - chat_stream = get_chat_manager().get_stream(context.stream_id) - if chat_stream: - focus_energy = chat_stream.focus_energy - # 使用能量管理器获取分发周期 - interval = energy_manager.get_distribution_interval(focus_energy) - logger.debug(f"流 {context.stream_id} 分发周期: {interval:.2f}s (能量: {focus_energy:.3f})") - return interval - else: - # 默认间隔 - return self.check_interval - - except Exception as e: - logger.error(f"计算分发周期失败: {e}") - return self.check_interval - - def _calculate_next_manager_delay(self) -> float: - """计算管理器下次检查的延迟时间""" - current_time = time.time() - min_delay = float("inf") - - # 找到最近需要检查的流 - try: - chat_manager = get_chat_manager() - for _stream_id, chat_stream in chat_manager.streams.items(): - context = chat_stream.stream_context - if not context or not context.is_active: - continue - - time_until_check = context.next_check_time - current_time - if time_until_check > 0: - min_delay = min(min_delay, time_until_check) - else: - min_delay = 0.1 # 立即检查 - break - - # 如果没有活跃流,使用默认间隔 - if min_delay == float("inf"): - return self.check_interval - - # 确保最小延迟 - return max(0.1, min(min_delay, self.check_interval)) - - except Exception as e: - logger.error(f"计算下次检查延迟时发生错误: {e}") - return self.check_interval - - async def _check_streams_with_individual_intervals(self): - """检查所有达到检查时间的聊天流""" - current_time = time.time() - processed_streams = 0 - - # 通过 ChatManager 获取活跃的流 - try: - chat_manager = get_chat_manager() - for stream_id, chat_stream in chat_manager.streams.items(): - context = chat_stream.stream_context - if not context or not context.is_active: - continue - - # 检查是否达到检查时间 - if current_time >= context.next_check_time: - # 更新检查时间 - context.last_check_time = current_time - - # 计算下次检查时间和分发周期 - if global_config.chat.dynamic_distribution_enabled: - context.distribution_interval = self._calculate_stream_distribution_interval(context) - else: - context.distribution_interval = self.check_interval - - # 设置下次检查时间 - context.next_check_time = current_time + context.distribution_interval - - # 检查未读消息 - unread_messages = chat_stream.context_manager.get_unread_messages() - if unread_messages: - processed_streams += 1 - self.stats.total_unread_messages = len(unread_messages) - - # 如果没有处理任务,创建一个 - if not context.processing_task or context.processing_task.done(): - focus_energy = chat_stream.focus_energy - - # 根据优先级记录日志 - if focus_energy >= 0.7: - logger.info( - f"高优先级流 {stream_id} 开始处理 | " - f"focus_energy: {focus_energy:.3f} | " - f"分发周期: {context.distribution_interval:.2f}s | " - f"未读消息: {len(unread_messages)}" - ) - else: - logger.debug( - f"流 {stream_id} 开始处理 | " - f"focus_energy: {focus_energy:.3f} | " - f"分发周期: {context.distribution_interval:.2f}s" - ) - - context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id)) - - except Exception as e: - logger.error(f"检查独立分发周期的聊天流时发生错误: {e}") - - # 更新活跃流计数 - try: - chat_manager = get_chat_manager() - active_count = len([s for s in chat_manager.streams.values() if s.stream_context.is_active]) - self.stats.active_streams = active_count - - if processed_streams > 0: - logger.debug(f"本次循环处理了 {processed_streams} 个流 | 活跃流总数: {active_count}") - except Exception as e: - logger.error(f"更新活跃流计数时发生错误: {e}") - - async def _check_all_streams_with_priority(self): - """按优先级检查所有聊天流,高focus_energy的流优先处理""" - try: - chat_manager = get_chat_manager() - if not chat_manager.streams: - return - - # 获取活跃的聊天流并按focus_energy排序 - active_streams = [] - for stream_id, chat_stream in chat_manager.streams.items(): - context = chat_stream.stream_context - if not context or not context.is_active: - continue - - # 获取focus_energy - focus_energy = chat_stream.focus_energy - - # 计算流优先级分数 - priority_score = self._calculate_stream_priority(context, focus_energy) - active_streams.append((priority_score, stream_id, context)) - - except Exception as e: - logger.error(f"获取活跃流列表时发生错误: {e}") - return - - # 按优先级降序排序 - active_streams.sort(reverse=True, key=lambda x: x[0]) - - # 处理排序后的流 - active_stream_count = 0 - total_unread = 0 - - for priority_score, stream_id, context in active_streams: - active_stream_count += 1 - - # 检查是否有未读消息 - try: - chat_stream = chat_manager.get_stream(stream_id) - if not chat_stream: - continue - - unread_messages = chat_stream.context_manager.get_unread_messages() - if unread_messages: - total_unread += len(unread_messages) - - # 如果没有处理任务,创建一个 - if not hasattr(context, 'processing_task') or not context.processing_task or context.processing_task.done(): - context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id)) - - # 高优先级流的额外日志 - if priority_score > 0.7: - logger.info( - f"高优先级流 {stream_id} 开始处理 | " - f"优先级: {priority_score:.3f} | " - f"未读消息: {len(unread_messages)}" - ) - except Exception as e: - logger.error(f"处理流 {stream_id} 的未读消息时发生错误: {e}") - continue - - # 更新统计 - self.stats.active_streams = active_stream_count - self.stats.total_unread_messages = total_unread - - def _calculate_stream_priority(self, context: StreamContext, focus_energy: float) -> float: - """计算聊天流的优先级分数 - 简化版本,主要使用focus_energy""" - # 使用重构后的能量管理器,主要依赖focus_energy - base_priority = focus_energy - - # 简单的未读消息加权 - unread_count = len(context.get_unread_messages()) - message_bonus = min(unread_count * 0.05, 0.2) # 最多20%加成 - - # 简单的时间加权 - current_time = time.time() - time_since_active = current_time - context.last_check_time - time_bonus = max(0, 1.0 - time_since_active / 7200.0) * 0.1 # 2小时内衰减 - - final_priority = base_priority + message_bonus + time_bonus - return max(0.0, min(1.0, final_priority)) - def _clear_all_unread_messages(self, stream_id: str): """清除指定上下文中的所有未读消息,防止意外情况导致消息一直未读""" try: diff --git a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py index d567cb67a..b6f7ae836 100644 --- a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py +++ b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py @@ -401,13 +401,31 @@ class ChatterPlanFilter: for msg_dict in messages: try: # 将字典转换为DatabaseMessages对象 - db_message = DatabaseMessages( - message_id=msg_dict.get("message_id", ""), - user_info=msg_dict.get("user_info", {}), - processed_plain_text=msg_dict.get("processed_plain_text", ""), - key_words=msg_dict.get("key_words", "[]"), - is_mentioned=msg_dict.get("is_mentioned", False) - ) + # 处理两种可能的数据格式:flatten()返回的平铺字段 或 包含user_info字段的字典 + user_info_dict = msg_dict.get("user_info", {}) + if isinstance(user_info_dict, dict) and user_info_dict: + # 如果有user_info字段,使用它 + db_message = DatabaseMessages( + message_id=msg_dict.get("message_id", ""), + user_id=user_info_dict.get("user_id", ""), + user_nickname=user_info_dict.get("user_nickname", ""), + user_platform=user_info_dict.get("platform", ""), + processed_plain_text=msg_dict.get("processed_plain_text", ""), + key_words=msg_dict.get("key_words", "[]"), + is_mentioned=msg_dict.get("is_mentioned", False), + **{"user_info": user_info_dict} # 通过kwargs传入user_info + ) + else: + # 如果没有user_info字段,使用平铺的字段(flatten()方法返回的格式) + db_message = DatabaseMessages( + message_id=msg_dict.get("message_id", ""), + user_id=msg_dict.get("user_id", ""), + user_nickname=msg_dict.get("user_nickname", ""), + user_platform=msg_dict.get("user_platform", ""), + processed_plain_text=msg_dict.get("processed_plain_text", ""), + key_words=msg_dict.get("key_words", "[]"), + is_mentioned=msg_dict.get("is_mentioned", False) + ) # 计算消息兴趣度 interest_score_obj = await chatter_interest_scoring_system._calculate_single_message_score(