refactor: 将流循环管理器替换为调度器分发器以处理消息

- 引入SchedulerDispatcher以通过统一调度器管理消息分发。
- 更新了MessageManager以使用新的调度器,移除了旧的流循环管理功能。
- 增强了 add_message 方法,以便在接收消息时通知调度器。
- 已移除废弃的中断处理方法,将其逻辑整合至调度器中。
- 修改了action_manager,改为等待数据库操作而非使用asyncio.create_task以实现更精细的控制。
- 优化了统一调度器,支持并发任务执行及运行中任务的取消。
- 为重构流程和新架构添加了全面的文档说明。
This commit is contained in:
Windpicker-owo
2025-11-04 23:13:52 +08:00
parent 0abf76a688
commit 959143155f
8 changed files with 906 additions and 885 deletions

View File

@@ -0,0 +1,210 @@
# 消息分发器重构文档
## 重构日期
2025-11-04
## 重构目标
将基于异步任务循环的消息分发机制改为使用统一的 `unified_scheduler`,实现更优雅和可维护的消息处理流程。
## 重构内容
### 1. 修改 unified_scheduler 以支持完全并发执行
**文件**: `src/schedule/unified_scheduler.py`
**主要改动**:
- 修改 `_check_and_trigger_tasks` 方法,使用 `asyncio.create_task` 为每个到期任务创建独立的异步任务
- 新增 `_execute_task_callback` 方法,用于并发执行单个任务
- 使用 `asyncio.gather` 并发等待所有任务完成,确保不同 schedule 之间完全异步执行,不会相互阻塞
**关键改进**:
```python
# 为每个任务创建独立的异步任务,确保并发执行
execution_tasks = []
for task in tasks_to_trigger:
execution_task = asyncio.create_task(
self._execute_task_callback(task, current_time),
name=f"execute_{task.task_name}"
)
execution_tasks.append(execution_task)
# 等待所有任务完成(使用 return_exceptions=True 避免单个任务失败影响其他任务)
results = await asyncio.gather(*execution_tasks, return_exceptions=True)
```
### 2. 创建新的 SchedulerDispatcher
**文件**: `src/chat/message_manager/scheduler_dispatcher.py`
**功能**:
基于 `unified_scheduler` 的消息分发器,替代原有的 `stream_loop_task` 循环机制。
**工作流程**:
1. **接收消息时**: 将消息添加到聊天流上下文(缓存)
2. **检查 schedule**: 查看该聊天流是否有活跃的 schedule
3. **打断判定**: 如果有活跃 schedule检查是否需要打断
- 如果需要打断,移除旧 schedule 并创建新的
- 如果不需要打断,保持原有 schedule
4. **创建 schedule**: 如果没有活跃 schedule创建新的
5. **Schedule 触发**: 当 schedule 到期时,激活 chatter 进行处理
6. **处理完成**: 计算下次间隔并根据需要注册新的 schedule
**关键方法**:
- `on_message_received(stream_id)`: 消息接收时的处理入口
- `_check_interruption(stream_id, context)`: 检查是否应该打断
- `_create_schedule(stream_id, context)`: 创建新的 schedule
- `_cancel_and_recreate_schedule(stream_id, context)`: 取消并重新创建 schedule
- `_on_schedule_triggered(stream_id)`: schedule 触发时的回调
- `_process_stream(stream_id, context)`: 激活 chatter 处理消息
### 3. 修改 MessageManager 集成新分发器
**文件**: `src/chat/message_manager/message_manager.py`
**主要改动**:
1. 导入 `scheduler_dispatcher`
2. 启动时初始化 `scheduler_dispatcher` 而非 `stream_loop_manager`
3. 修改 `add_message` 方法:
- 将消息添加到上下文后
- 调用 `scheduler_dispatcher.on_message_received(stream_id)` 处理消息接收事件
4. 废弃 `_check_and_handle_interruption` 方法(打断逻辑已集成到 dispatcher
**新的消息接收流程**:
```python
async def add_message(self, stream_id: str, message: DatabaseMessages):
# 1. 检查 notice 消息
if self._is_notice_message(message):
await self._handle_notice_message(stream_id, message)
if not global_config.notice.enable_notice_trigger_chat:
return
# 2. 将消息添加到上下文
chat_stream = await chat_manager.get_stream(stream_id)
await chat_stream.context_manager.add_message(message)
# 3. 通知 scheduler_dispatcher 处理
await scheduler_dispatcher.on_message_received(stream_id)
```
### 4. 更新模块导出
**文件**: `src/chat/message_manager/__init__.py`
**改动**:
- 导出 `SchedulerDispatcher``scheduler_dispatcher`
## 架构对比
### 旧架构 (基于 stream_loop_task)
```
消息到达 -> add_message -> 添加到上下文 -> 检查打断 -> 取消 stream_loop_task
-> 重新创建 stream_loop_task
stream_loop_task: while True:
检查未读消息 -> 处理消息 -> 计算间隔 -> sleep(间隔)
```
**问题**:
- 每个聊天流维护一个独立的异步循环任务
- 即使没有消息也需要持续轮询
- 打断逻辑通过取消和重建任务实现,较为复杂
- 难以统一管理和监控
### 新架构 (基于 unified_scheduler)
```
消息到达 -> add_message -> 添加到上下文 -> dispatcher.on_message_received
-> 检查是否有活跃 schedule
-> 打断判定
-> 创建/更新 schedule
schedule 到期 -> _on_schedule_triggered -> 处理消息 -> 计算间隔 -> 创建新 schedule (如果需要)
```
**优势**:
- 使用统一的调度器管理所有聊天流
- 按需创建 schedule没有消息时不会创建
- 打断逻辑清晰:移除旧 schedule + 创建新 schedule
- 易于监控和统计(统一的 scheduler 统计)
- 完全异步并发,多个 schedule 可以同时触发而不相互阻塞
## 兼容性
### 保留的组件
- `stream_loop_manager`: 暂时保留但不启动,以便需要时回滚
- `_check_and_handle_interruption`: 保留方法签名但不执行,避免破坏现有调用
### 移除的组件
- 无(本次重构采用渐进式方式,先添加新功能,待稳定后再移除旧代码)
## 配置项
所有配置项保持不变,新分发器完全兼容现有配置:
- `chat.interruption_enabled`: 是否启用打断
- `chat.allow_reply_interruption`: 是否允许回复时打断
- `chat.interruption_max_limit`: 最大打断次数
- `chat.distribution_interval`: 基础分发间隔
- `chat.force_dispatch_unread_threshold`: 强制分发阈值
- `chat.force_dispatch_min_interval`: 强制分发最小间隔
## 测试建议
1. **基本功能测试**
- 单个聊天流接收消息并正常处理
- 多个聊天流同时接收消息并并发处理
2. **打断测试**
- 在 chatter 处理过程中发送新消息,验证打断逻辑
- 验证打断次数限制
- 验证打断概率计算
3. **间隔计算测试**
- 验证基于能量的动态间隔计算
- 验证强制分发阈值触发
4. **并发测试**
- 多个聊天流的 schedule 同时到期,验证并发执行
- 验证不同 schedule 之间不会相互阻塞
5. **长时间稳定性测试**
- 运行较长时间,观察是否有内存泄漏
- 观察 schedule 创建和销毁是否正常
## 回滚方案
如果新机制出现问题,可以通过以下步骤回滚:
1.`message_manager.py``start()` 方法中:
```python
# 注释掉新分发器
# await scheduler_dispatcher.start()
# scheduler_dispatcher.set_chatter_manager(self.chatter_manager)
# 启用旧分发器
await stream_loop_manager.start()
stream_loop_manager.set_chatter_manager(self.chatter_manager)
```
2. 在 `add_message()` 方法中:
```python
# 注释掉新逻辑
# await scheduler_dispatcher.on_message_received(stream_id)
# 恢复旧逻辑
await self._check_and_handle_interruption(chat_stream, message)
```
3. 在 `_check_and_handle_interruption()` 方法中移除开头的 `return` 语句
## 后续工作
1. 在确认新机制稳定后,完全移除 `stream_loop_manager` 相关代码
2. 清理 `StreamContext` 中的 `stream_loop_task` 字段
3. 移除 `_check_and_handle_interruption` 方法
4. 更新相关文档和注释
## 性能预期
- **资源占用**: 减少(不再为每个流维护独立循环)
- **响应延迟**: 不变(仍基于相同的间隔计算)
- **并发能力**: 提升(完全异步执行,无阻塞)
- **可维护性**: 提升(逻辑更清晰,统一管理)

View File

@@ -1,16 +1,16 @@
"""
消息管理器模块
提供统一的消息管理、上下文管理和流循环调度功能
提供统一的消息管理、上下文管理和基于 scheduler 的消息分发功能
"""
from .context_manager import SingleStreamContextManager
from .distribution_manager import StreamLoopManager, stream_loop_manager
from .message_manager import MessageManager, message_manager
from .scheduler_dispatcher import SchedulerDispatcher, scheduler_dispatcher
__all__ = [
"MessageManager",
"SingleStreamContextManager",
"StreamLoopManager",
"SchedulerDispatcher",
"message_manager",
"stream_loop_manager",
"scheduler_dispatcher",
]

View File

@@ -15,11 +15,9 @@ from src.common.logger import get_logger
from src.config.config import global_config
from src.plugin_system.base.component_types import ChatType
from .distribution_manager import stream_loop_manager
logger = get_logger("context_manager")
# 全局背景任务集合
# 全局背景任务集合(用于异步初始化等后台任务)
_background_tasks = set()
@@ -65,12 +63,20 @@ class SingleStreamContextManager:
bool: 是否成功添加
"""
try:
# 使用MessageManager的内置缓存系统
# 尝试使用MessageManager的内置缓存系统
use_cache_system = False
message_manager = None
try:
from .message_manager import message_manager
from .message_manager import message_manager as mm
message_manager = mm
use_cache_system = message_manager.is_running
except Exception as e:
logger.debug(f"MessageManager不可用使用直接添加: {e}")
use_cache_system = False
# 如果MessageManager正在运行,使用缓存系统
if message_manager.is_running:
if use_cache_system and message_manager:
# 使用缓存系统
try:
# 先计算兴趣值(需要在缓存前计算)
await self._calculate_message_interest(message)
message.is_read = False
@@ -97,18 +103,18 @@ class SingleStreamContextManager:
else:
logger.debug(f"消息已缓存,等待当前处理完成: stream={self.stream_id}")
# 启动流的循环任务(如果还未启动)
task = asyncio.create_task(stream_loop_manager.start_stream_loop(self.stream_id))
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
logger.debug(f"添加消息到缓存系统: {self.stream_id}")
return True
else:
logger.warning(f"消息缓存系统添加失败,回退到直接添加: {self.stream_id}")
use_cache_system = False
except Exception as e:
logger.warning(f"消息缓存系统异常,回退到直接添加: {self.stream_id}, error={e}")
use_cache_system = False
# 回退方案:直接添加到未读消息
# 这部分代码在缓存系统失败或不可用时执行
if not use_cache_system:
message.is_read = False
self.context.unread_messages.append(message)
@@ -119,12 +125,13 @@ class SingleStreamContextManager:
await self._calculate_message_interest(message)
self.total_messages += 1
self.last_access_time = time.time()
# 启动流的循环任务(如果还未启动)
task = asyncio.create_task(stream_loop_manager.start_stream_loop(self.stream_id))
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
logger.debug(f"添加消息{message.processed_plain_text}到单流上下文: {self.stream_id}")
return True
# 不应该到达这里,但为了类型检查添加返回值
return True
except Exception as e:
logger.error(f"添加消息到单流上下文失败 {self.stream_id}: {e}", exc_info=True)
return False

View File

@@ -1,694 +0,0 @@
"""
流循环管理器
为每个聊天流创建独立的无限循环任务,主动轮询处理消息
"""
import asyncio
import time
from typing import Any
from src.chat.chatter_manager import ChatterManager
from src.chat.energy_system import energy_manager
from src.common.data_models.message_manager_data_model import StreamContext
from src.common.logger import get_logger
from src.config.config import global_config
from src.plugin_system.apis.chat_api import get_chat_manager
logger = get_logger("stream_loop_manager")
class StreamLoopManager:
"""流循环管理器 - 每个流一个独立的无限循环任务"""
def __init__(self, max_concurrent_streams: int | None = None):
# 统计信息
self.stats: dict[str, Any] = {
"active_streams": 0,
"total_loops": 0,
"total_process_cycles": 0,
"total_failures": 0,
"start_time": time.time(),
}
# 配置参数
self.max_concurrent_streams = max_concurrent_streams or global_config.chat.max_concurrent_distributions
# 强制分发策略
self.force_dispatch_unread_threshold: int | None = getattr(
global_config.chat, "force_dispatch_unread_threshold", 20
)
self.force_dispatch_min_interval: float = getattr(global_config.chat, "force_dispatch_min_interval", 0.1)
# Chatter管理器
self.chatter_manager: ChatterManager | None = None
# 状态控制
self.is_running = False
# 每个流的上一次间隔值(用于日志去重)
self._last_intervals: dict[str, float] = {}
logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})")
async def start(self) -> None:
"""启动流循环管理器"""
if self.is_running:
logger.warning("流循环管理器已经在运行")
return
self.is_running = True
async def stop(self) -> None:
"""停止流循环管理器"""
if not self.is_running:
return
self.is_running = False
# 取消所有流循环
try:
# 获取所有活跃的流
from src.plugin_system.apis.chat_api import get_chat_manager
chat_manager = get_chat_manager()
all_streams = await chat_manager.get_all_streams()
# 创建任务列表以便并发取消
cancel_tasks = []
for chat_stream in all_streams:
context = chat_stream.context_manager.context
if context.stream_loop_task and not context.stream_loop_task.done():
context.stream_loop_task.cancel()
cancel_tasks.append((chat_stream.stream_id, context.stream_loop_task))
# 并发等待所有任务取消
if cancel_tasks:
logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...")
await asyncio.gather(
*[self._wait_for_task_cancel(stream_id, task) for stream_id, task in cancel_tasks],
return_exceptions=True,
)
logger.info("所有流循环已清理")
except Exception as e:
logger.error(f"停止管理器时出错: {e}")
logger.info("流循环管理器已停止")
async def start_stream_loop(self, stream_id: str, force: bool = False) -> bool:
"""启动指定流的循环任务 - 优化版本使用自适应管理器
Args:
stream_id: 流ID
force: 是否强制启动
Returns:
bool: 是否成功启动
"""
# 获取流上下文
context = await self._get_stream_context(stream_id)
if not context:
logger.warning(f"无法获取流上下文: {stream_id}")
return False
# 快速路径:如果流已存在且不是强制启动,无需处理
if not force and context.stream_loop_task and not context.stream_loop_task.done():
logger.debug(f"{stream_id} 循环已在运行")
return True
# 如果是强制启动且任务仍在运行,先取消旧任务
if force and context.stream_loop_task and not context.stream_loop_task.done():
logger.debug(f"强制启动模式:先取消现有流循环任务: {stream_id}")
old_task = context.stream_loop_task
old_task.cancel()
try:
await asyncio.wait_for(old_task, timeout=2.0)
logger.debug(f"旧流循环任务已结束: {stream_id}")
except (asyncio.TimeoutError, asyncio.CancelledError):
logger.debug(f"旧流循环任务已取消或超时: {stream_id}")
except Exception as e:
logger.warning(f"等待旧任务结束时出错: {e}")
# 创建流循环任务
try:
loop_task = asyncio.create_task(self._stream_loop_worker(stream_id), name=f"stream_loop_{stream_id}")
# 将任务记录到 StreamContext 中
context.stream_loop_task = loop_task
# 更新统计信息
self.stats["active_streams"] += 1
self.stats["total_loops"] += 1
logger.debug(f"启动流循环任务: {stream_id}")
return True
except Exception as e:
logger.error(f"启动流循环任务失败 {stream_id}: {e}")
return False
async def stop_stream_loop(self, stream_id: str) -> bool:
"""停止指定流的循环任务
Args:
stream_id: 流ID
Returns:
bool: 是否成功停止
"""
# 获取流上下文
context = await self._get_stream_context(stream_id)
if not context:
logger.debug(f"{stream_id} 上下文不存在,无需停止")
return False
# 检查是否有 stream_loop_task
if not context.stream_loop_task or context.stream_loop_task.done():
logger.debug(f"{stream_id} 循环不存在或已结束,无需停止")
return False
task = context.stream_loop_task
if not task.done():
task.cancel()
try:
# 设置取消超时,避免无限等待
await asyncio.wait_for(task, timeout=5.0)
except asyncio.CancelledError:
logger.debug(f"流循环任务已取消: {stream_id}")
except asyncio.TimeoutError:
logger.warning(f"流循环任务取消超时: {stream_id}")
except Exception as e:
logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}")
# 清空 StreamContext 中的任务记录
context.stream_loop_task = None
logger.debug(f"停止流循环: {stream_id}")
return True
async def _stream_loop_worker(self, stream_id: str) -> None:
"""单个流的工作循环 - 优化版本
Args:
stream_id: 流ID
"""
logger.debug(f"流循环工作器启动: {stream_id}")
try:
while self.is_running:
try:
# 1. 获取流上下文
context = await self._get_stream_context(stream_id)
if not context:
logger.warning(f"无法获取流上下文: {stream_id}")
await asyncio.sleep(10.0)
continue
# 2. 检查是否有消息需要处理
unread_count = self._get_unread_count(context)
force_dispatch = self._needs_force_dispatch_for_context(context, unread_count)
has_messages = force_dispatch or await self._has_messages_to_process(context)
if has_messages:
if force_dispatch:
logger.info("%s 未读消息 %d 条,触发强制分发", stream_id, unread_count)
# 3. 在处理前更新能量值(用于下次间隔计算)
try:
await self._update_stream_energy(stream_id, context)
except Exception as e:
logger.debug(f"更新流能量失败 {stream_id}: {e}")
# 4. 激活chatter处理
success = await self._process_stream_messages(stream_id, context)
# 更新统计
self.stats["total_process_cycles"] += 1
if success:
logger.debug(f"流处理成功: {stream_id}")
else:
self.stats["total_failures"] += 1
logger.warning(f"流处理失败: {stream_id}")
# 5. 计算下次检查间隔
interval = await self._calculate_interval(stream_id, has_messages)
# 6. sleep等待下次检查
# 只在间隔发生变化时输出日志,避免刷屏
last_interval = self._last_intervals.get(stream_id)
if last_interval is None or abs(interval - last_interval) > 0.01:
logger.info(f"{stream_id} 等待周期变化: {interval:.2f}s")
self._last_intervals[stream_id] = interval
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.debug(f"流循环被取消: {stream_id}")
break
except Exception as e:
logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True)
self.stats["total_failures"] += 1
await asyncio.sleep(5.0) # 错误时等待5秒再重试
finally:
# 清理 StreamContext 中的任务记录
try:
context = await self._get_stream_context(stream_id)
if context and context.stream_loop_task:
context.stream_loop_task = None
logger.debug(f"清理 StreamContext 中的流循环任务: {stream_id}")
except Exception as e:
logger.debug(f"清理 StreamContext 任务记录失败: {e}")
# 清理间隔记录
self._last_intervals.pop(stream_id, None)
logger.debug(f"流循环结束: {stream_id}")
async def _get_stream_context(self, stream_id: str) -> Any | None:
"""获取流上下文
Args:
stream_id: 流ID
Returns:
Optional[Any]: 流上下文如果不存在返回None
"""
try:
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if chat_stream:
return chat_stream.context_manager.context
return None
except Exception as e:
logger.error(f"获取流上下文失败 {stream_id}: {e}")
return None
async def _has_messages_to_process(self, context: StreamContext) -> bool:
"""检查是否有消息需要处理
Args:
context: 流上下文
Returns:
bool: 是否有未读消息
"""
try:
# 检查是否有未读消息
if hasattr(context, "unread_messages") and context.unread_messages:
return True
# 检查其他需要处理的条件
if hasattr(context, "has_pending_messages") and context.has_pending_messages:
return True
return False
except Exception as e:
logger.error(f"检查消息状态失败: {e}")
return False
async def _process_stream_messages(self, stream_id: str, context: StreamContext) -> bool:
"""处理流消息 - 支持子任务管理
Args:
stream_id: 流ID
context: 流上下文
Returns:
bool: 是否处理成功
"""
if not self.chatter_manager:
logger.warning(f"Chatter管理器未设置: {stream_id}")
return False
# 设置处理状态为正在处理
self._set_stream_processing_status(stream_id, True)
# 子任务跟踪
child_tasks = set()
try:
start_time = time.time()
# 注意缓存消息刷新已移至planner开始时执行动作修改器之后此处不再刷新
# 设置触发用户ID以实现回复保护
last_message = context.get_last_message()
if last_message:
context.triggering_user_id = last_message.user_info.user_id
# 创建子任务用于刷新能量(不阻塞主流程)
energy_task = asyncio.create_task(self._refresh_focus_energy(stream_id))
child_tasks.add(energy_task)
energy_task.add_done_callback(lambda t: child_tasks.discard(t))
# 设置 Chatter 正在处理的标志
context.is_chatter_processing = True
logger.debug(f"设置 Chatter 处理标志: {stream_id}")
# 直接调用chatter_manager处理流上下文
results = await self.chatter_manager.process_stream_context(stream_id, context)
success = results.get("success", False)
if success:
# 处理成功后,再次刷新缓存中可能的新消息
additional_messages = await self._flush_cached_messages_to_unread(stream_id)
if additional_messages:
logger.debug(f"处理完成后刷新新消息: stream={stream_id}, 数量={len(additional_messages)}")
process_time = time.time() - start_time
logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)")
else:
logger.warning(f"流处理失败: {stream_id} - {results.get('error_message', '未知错误')}")
return success
except asyncio.CancelledError:
logger.debug(f"流处理被取消: {stream_id}")
# 取消所有子任务
for child_task in child_tasks:
if not child_task.done():
child_task.cancel()
raise
except Exception as e:
logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True)
# 异常时也要清理子任务
for child_task in child_tasks:
if not child_task.done():
child_task.cancel()
return False
finally:
# 清除 Chatter 处理标志
context.is_chatter_processing = False
logger.debug(f"清除 Chatter 处理标志: {stream_id}")
# 无论成功或失败,都要设置处理状态为未处理
self._set_stream_processing_status(stream_id, False)
def _set_stream_processing_status(self, stream_id: str, is_processing: bool) -> None:
"""设置流的处理状态"""
try:
from .message_manager import message_manager
if message_manager.is_running:
message_manager.set_stream_processing_status(stream_id, is_processing)
logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}")
except ImportError:
logger.debug("MessageManager不可用跳过状态设置")
except Exception as e:
logger.warning(f"设置流处理状态失败: stream={stream_id}, error={e}")
async def _flush_cached_messages_to_unread(self, stream_id: str) -> list:
"""将缓存消息刷新到未读消息列表"""
try:
from .message_manager import message_manager
if message_manager.is_running and message_manager.has_cached_messages(stream_id):
# 获取缓存消息
cached_messages = message_manager.flush_cached_messages(stream_id)
if cached_messages:
# 获取聊天流并添加到未读消息
from src.plugin_system.apis.chat_api import get_chat_manager
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if chat_stream:
for message in cached_messages:
chat_stream.context_manager.context.unread_messages.append(message)
logger.debug(f"刷新缓存消息到未读列表: stream={stream_id}, 数量={len(cached_messages)}")
else:
logger.warning(f"无法找到聊天流: {stream_id}")
return cached_messages
return []
except ImportError:
logger.debug("MessageManager不可用跳过缓存刷新")
return []
except Exception as e:
logger.warning(f"刷新缓存消息失败: stream={stream_id}, error={e}")
return []
async def _update_stream_energy(self, stream_id: str, context: Any) -> None:
"""更新流的能量值
Args:
stream_id: 流ID
context: 流上下文 (StreamContext)
"""
try:
from src.chat.message_receive.chat_stream import get_chat_manager
# 获取聊天流
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if not chat_stream:
logger.debug(f"无法找到聊天流 {stream_id},跳过能量更新")
return
# 从 context_manager 获取消息(包括未读和历史消息)
# 合并未读消息和历史消息
all_messages = []
# 添加历史消息
history_messages = context.get_history_messages(limit=global_config.chat.max_context_size)
all_messages.extend(history_messages)
# 添加未读消息
unread_messages = context.get_unread_messages()
all_messages.extend(unread_messages)
# 按时间排序并限制数量
all_messages.sort(key=lambda m: m.time)
messages = all_messages[-global_config.chat.max_context_size:]
# 获取用户ID
user_id = None
if context.triggering_user_id:
user_id = context.triggering_user_id
# 使用能量管理器计算并缓存能量值
energy = await energy_manager.calculate_focus_energy(
stream_id=stream_id,
messages=messages,
user_id=user_id
)
# 同步更新到 ChatStream
chat_stream._focus_energy = energy
logger.debug(f"已更新流 {stream_id} 的能量值: {energy:.3f}")
except Exception as e:
logger.warning(f"更新流能量失败 {stream_id}: {e}", exc_info=False)
async def _calculate_interval(self, stream_id: str, has_messages: bool) -> float:
"""计算下次检查间隔
Args:
stream_id: 流ID
has_messages: 本次是否有消息处理
Returns:
float: 间隔时间(秒)
"""
# 基础间隔
base_interval = getattr(global_config.chat, "distribution_interval", 5.0)
# 如果没有消息,使用更长的间隔
if not has_messages:
return base_interval * 2.0 # 无消息时间隔加倍
# 尝试使用能量管理器计算间隔
try:
# 获取当前focus_energy
focus_energy = energy_manager.energy_cache.get(stream_id, (0.5, 0))[0]
# 使用能量管理器计算间隔
interval = energy_manager.get_distribution_interval(focus_energy)
logger.debug(f"{stream_id} 动态间隔: {interval:.2f}s (能量: {focus_energy:.3f})")
return interval
except Exception as e:
logger.debug(f"{stream_id} 使用默认间隔: {base_interval:.2f}s ({e})")
return base_interval
def get_queue_status(self) -> dict[str, Any]:
"""获取队列状态
Returns:
Dict[str, Any]: 队列状态信息
"""
current_time = time.time()
uptime = current_time - self.stats["start_time"] if self.is_running else 0
# 从统计信息中获取活跃流数量
active_streams = self.stats.get("active_streams", 0)
return {
"active_streams": active_streams,
"total_loops": self.stats["total_loops"],
"max_concurrent": self.max_concurrent_streams,
"is_running": self.is_running,
"uptime": uptime,
"total_process_cycles": self.stats["total_process_cycles"],
"total_failures": self.stats["total_failures"],
"stats": self.stats.copy(),
}
def set_chatter_manager(self, chatter_manager: ChatterManager) -> None:
"""设置chatter管理器
Args:
chatter_manager: chatter管理器实例
"""
self.chatter_manager = chatter_manager
logger.debug(f"设置chatter管理器: {chatter_manager.__class__.__name__}")
async def _should_force_dispatch_for_stream(self, stream_id: str) -> bool:
if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0:
return False
try:
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if not chat_stream:
return False
unread = getattr(chat_stream.context_manager.context, "unread_messages", [])
return len(unread) > self.force_dispatch_unread_threshold
except Exception as e:
logger.debug(f"检查流 {stream_id} 是否需要强制分发失败: {e}")
return False
def _get_unread_count(self, context: StreamContext) -> int:
try:
unread_messages = context.unread_messages
if unread_messages is None:
return 0
return len(unread_messages)
except Exception:
return 0
def _needs_force_dispatch_for_context(self, context: StreamContext, unread_count: int | None = None) -> bool:
if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0:
return False
count = unread_count if unread_count is not None else self._get_unread_count(context)
return count > self.force_dispatch_unread_threshold
def get_performance_summary(self) -> dict[str, Any]:
"""获取性能摘要
Returns:
Dict[str, Any]: 性能摘要
"""
current_time = time.time()
uptime = current_time - self.stats["start_time"]
# 计算吞吐量
throughput = self.stats["total_process_cycles"] / max(1, uptime / 3600) # 每小时处理次数
# 从统计信息中获取活跃流数量
active_streams = self.stats.get("active_streams", 0)
return {
"uptime_hours": uptime / 3600,
"active_streams": active_streams,
"total_process_cycles": self.stats["total_process_cycles"],
"total_failures": self.stats["total_failures"],
"throughput_per_hour": throughput,
"max_concurrent_streams": self.max_concurrent_streams,
}
async def _refresh_focus_energy(self, stream_id: str) -> None:
"""分发完成后基于历史消息刷新能量值"""
try:
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if not chat_stream:
logger.debug(f"刷新能量时未找到聊天流: {stream_id}")
return
await chat_stream.context_manager.refresh_focus_energy_from_history()
logger.debug(f"已刷新聊天流 {stream_id} 的聚焦能量")
except Exception as e:
logger.warning(f"刷新聊天流 {stream_id} 能量失败: {e}")
async def _wait_for_task_cancel(self, stream_id: str, task: asyncio.Task) -> None:
"""等待任务取消完成,带有超时控制
Args:
stream_id: 流ID
task: 要等待取消的任务
"""
try:
await asyncio.wait_for(task, timeout=5.0)
logger.debug(f"流循环任务已正常结束: {stream_id}")
except asyncio.CancelledError:
logger.debug(f"流循环任务已取消: {stream_id}")
except asyncio.TimeoutError:
logger.warning(f"流循环任务取消超时: {stream_id}")
except Exception as e:
logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}")
async def _force_dispatch_stream(self, stream_id: str) -> None:
"""强制分发流处理
当流的未读消息超过阈值时,强制触发分发处理
这个方法主要用于突破并发限制时的紧急处理
注意:此方法目前未被使用,相关功能已集成到 start_stream_loop 方法中
Args:
stream_id: 流ID
"""
logger.debug(f"强制分发流处理: {stream_id}")
try:
# 获取流上下文
context = await self._get_stream_context(stream_id)
if not context:
logger.warning(f"强制分发时未找到流上下文: {stream_id}")
return
# 检查是否有现有的 stream_loop_task
if context.stream_loop_task and not context.stream_loop_task.done():
logger.debug(f"发现现有流循环 {stream_id},将先取消再重新创建")
existing_task = context.stream_loop_task
existing_task.cancel()
# 创建异步任务来等待取消完成,并添加异常处理
cancel_task = asyncio.create_task(
self._wait_for_task_cancel(stream_id, existing_task), name=f"cancel_existing_loop_{stream_id}"
)
# 为取消任务添加异常处理,避免孤儿任务
cancel_task.add_done_callback(
lambda task: logger.debug(f"取消任务完成: {stream_id}")
if not task.exception()
else logger.error(f"取消任务异常: {stream_id} - {task.exception()}")
)
# 检查未读消息数量
unread_count = self._get_unread_count(context)
logger.info(f"{stream_id} 当前未读消息数: {unread_count}")
# 使用 start_stream_loop 重新创建流循环任务
success = await self.start_stream_loop(stream_id, force=True)
if success:
logger.info(f"已创建强制分发流循环: {stream_id}")
else:
logger.warning(f"创建强制分发流循环失败: {stream_id}")
except Exception as e:
logger.error(f"强制分发流处理失败 {stream_id}: {e}", exc_info=True)
# 全局流循环管理器实例
stream_loop_manager = StreamLoopManager()

View File

@@ -18,8 +18,8 @@ from src.common.logger import get_logger
from src.config.config import global_config
from src.plugin_system.apis.chat_api import get_chat_manager
from .distribution_manager import stream_loop_manager
from .global_notice_manager import NoticeScope, global_notice_manager
from .scheduler_dispatcher import scheduler_dispatcher
if TYPE_CHECKING:
pass
@@ -74,11 +74,16 @@ class MessageManager:
# 启动消息缓存系统(内置)
logger.debug("消息缓存系统已启动")
# 启动流循环管理器并设置chatter_manager
await stream_loop_manager.start()
stream_loop_manager.set_chatter_manager(self.chatter_manager)
# 启动基于 scheduler 的消息分发器
await scheduler_dispatcher.start()
scheduler_dispatcher.set_chatter_manager(self.chatter_manager)
logger.info("消息管理器已启动")
# 保留旧的流循环管理器(暂时)以便平滑过渡
# TODO: 在确认新机制稳定后移除
# await stream_loop_manager.start()
# stream_loop_manager.set_chatter_manager(self.chatter_manager)
logger.info("消息管理器已启动(使用 Scheduler 分发器)")
async def stop(self):
"""停止消息管理器"""
@@ -101,13 +106,22 @@ class MessageManager:
self.stream_processing_status.clear()
logger.debug("消息缓存系统已停止")
# 停止流循环管理
await stream_loop_manager.stop()
# 停止基于 scheduler 的消息分发
await scheduler_dispatcher.stop()
# 停止旧的流循环管理器(如果启用)
# await stream_loop_manager.stop()
logger.info("消息管理器已停止")
async def add_message(self, stream_id: str, message: DatabaseMessages):
"""添加消息到指定聊天流"""
"""添加消息到指定聊天流
新的流程:
1. 检查 notice 消息
2. 将消息添加到上下文(缓存)
3. 通知 scheduler_dispatcher 处理(检查打断、创建/更新 schedule
"""
try:
# 检查是否为notice消息
@@ -130,9 +144,14 @@ class MessageManager:
if not chat_stream:
logger.warning(f"MessageManager.add_message: 聊天流 {stream_id} 不存在")
return
await self._check_and_handle_interruption(chat_stream, message)
# 将消息添加到上下文
await chat_stream.context_manager.add_message(message)
# 通知 scheduler_dispatcher 处理消息接收事件
# dispatcher 会检查是否需要打断、创建或更新 schedule
await scheduler_dispatcher.on_message_received(stream_id)
except Exception as e:
logger.error(f"添加消息到聊天流 {stream_id} 时发生错误: {e}")
@@ -299,122 +318,9 @@ class MessageManager:
except Exception as e:
logger.error(f"清理不活跃聊天流时发生错误: {e}")
async def _check_and_handle_interruption(self, chat_stream: ChatStream | None = None, message: DatabaseMessages | None = None):
"""检查并处理消息打断 - 通过取消 stream_loop_task 实现"""
if not global_config.chat.interruption_enabled or not chat_stream or not message:
return
# 检查是否正在回复,以及是否允许在回复时打断
if chat_stream.context_manager.context.is_replying:
if not global_config.chat.allow_reply_interruption:
logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复中,且配置不允许回复时打断,跳过打断检查")
return
else:
logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复中,但配置允许回复时打断")
# 检查是否为表情包消息
if message.is_picid or message.is_emoji:
logger.info(f"消息 {message.message_id} 是表情包或Emoji跳过打断检查")
return
# 检查上下文
context = chat_stream.context_manager.context
# 只有当 Chatter 真正在处理时才检查打断
if not context.is_chatter_processing:
logger.debug(f"聊天流 {chat_stream.stream_id} Chatter 未在处理,跳过打断检查")
return
# 检查是否有 stream_loop_task 在运行
stream_loop_task = context.stream_loop_task
if stream_loop_task and not stream_loop_task.done():
# 检查触发用户ID
triggering_user_id = context.triggering_user_id
if triggering_user_id and message.user_info.user_id != triggering_user_id:
logger.info(f"消息来自非触发用户 {message.user_info.user_id},实际触发用户为 {triggering_user_id},跳过打断检查")
return
# 计算打断概率
interruption_probability = context.calculate_interruption_probability(
global_config.chat.interruption_max_limit
)
# 检查是否已达到最大打断次数
if context.interruption_count >= global_config.chat.interruption_max_limit:
logger.debug(
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},跳过打断检查"
)
return
# 根据概率决定是否打断
if random.random() < interruption_probability:
logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}")
# 取消 stream_loop_task子任务会通过 try-catch 自动取消
try:
stream_loop_task.cancel()
# 等待任务真正结束(设置超时避免死锁)
try:
await asyncio.wait_for(stream_loop_task, timeout=2.0)
logger.info(f"流循环任务已完全结束: {chat_stream.stream_id}")
except asyncio.TimeoutError:
logger.warning(f"等待流循环任务结束超时: {chat_stream.stream_id}")
except asyncio.CancelledError:
logger.info(f"流循环任务已被取消: {chat_stream.stream_id}")
except Exception as e:
logger.warning(f"取消流循环任务失败: {chat_stream.stream_id} - {e}")
# 增加打断计数
await context.increment_interruption_count()
# 打断后重新创建 stream_loop 任务
await self._trigger_reprocess(chat_stream)
# 检查是否已达到最大次数
if context.interruption_count >= global_config.chat.interruption_max_limit:
logger.warning(
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断"
)
else:
logger.info(
f"聊天流 {chat_stream.stream_id} 已打断并重新进入处理流程,当前打断次数: {context.interruption_count}/{global_config.chat.interruption_max_limit}"
)
else:
logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f}")
async def _trigger_reprocess(self, chat_stream: ChatStream):
"""重新处理聊天流的核心逻辑 - 重新创建 stream_loop 任务"""
try:
stream_id = chat_stream.stream_id
logger.info(f"🚀 打断后重新创建流循环任务: {stream_id}")
# 等待一小段时间确保当前消息已经添加到未读消息中
await asyncio.sleep(0.1)
# 获取当前的stream context
context = chat_stream.context_manager.context
# 确保有未读消息需要处理
unread_messages = context.get_unread_messages()
if not unread_messages:
logger.debug(f"聊天流 {stream_id} 没有未读消息,跳过重新处理")
return
logger.debug(f"准备重新处理 {len(unread_messages)} 条未读消息: {stream_id}")
# 重新创建 stream_loop 任务
success = await stream_loop_manager.start_stream_loop(stream_id, force=True)
if success:
logger.debug(f"成功重新创建流循环任务: {stream_id}")
else:
logger.warning(f"重新创建流循环任务失败: {stream_id}")
except Exception as e:
logger.error(f"触发重新处理时出错: {e}")
# === 已废弃的方法已移除 ===
# _check_and_handle_interruption 和 _trigger_reprocess 已由 scheduler_dispatcher 接管
# 如需查看历史代码,请参考 git 历史记录
async def clear_all_unread_messages(self, stream_id: str):
"""清除指定上下文中的所有未读消息,在消息处理完成后调用"""

View File

@@ -0,0 +1,534 @@
"""
基于 unified_scheduler 的消息分发管理器
替代原有的 stream_loop_task 循环机制,使用统一的调度器来管理消息处理时机
"""
import asyncio
import time
from typing import Any
from src.chat.chatter_manager import ChatterManager
from src.chat.energy_system import energy_manager
from src.common.data_models.message_manager_data_model import StreamContext
from src.common.logger import get_logger
from src.config.config import global_config
from src.plugin_system.apis.chat_api import get_chat_manager
from src.schedule.unified_scheduler import TriggerType, unified_scheduler
logger = get_logger("scheduler_dispatcher")
class SchedulerDispatcher:
"""基于 scheduler 的消息分发器
工作流程:
1. 接收消息时,将消息添加到聊天流上下文
2. 检查是否有活跃的 schedule如果没有则创建
3. 如果有,检查打断判定,成功则移除旧 schedule 并创建新的
4. schedule 到期时,激活 chatter 处理
5. 处理完成后,计算下次间隔并注册新 schedule
"""
def __init__(self):
# 追踪每个流的 schedule_id
self.stream_schedules: dict[str, str] = {} # stream_id -> schedule_id
# Chatter 管理器
self.chatter_manager: ChatterManager | None = None
# 统计信息
self.stats = {
"total_schedules_created": 0,
"total_schedules_cancelled": 0,
"total_interruptions": 0,
"total_process_cycles": 0,
"total_failures": 0,
"start_time": time.time(),
}
self.is_running = False
logger.info("基于 Scheduler 的消息分发器初始化完成")
async def start(self) -> None:
"""启动分发器"""
if self.is_running:
logger.warning("分发器已在运行")
return
self.is_running = True
logger.info("基于 Scheduler 的消息分发器已启动")
async def stop(self) -> None:
"""停止分发器"""
if not self.is_running:
return
self.is_running = False
# 取消所有活跃的 schedule
schedule_ids = list(self.stream_schedules.values())
for schedule_id in schedule_ids:
try:
await unified_scheduler.remove_schedule(schedule_id)
except Exception as e:
logger.error(f"移除 schedule {schedule_id} 失败: {e}")
self.stream_schedules.clear()
logger.info("基于 Scheduler 的消息分发器已停止")
def set_chatter_manager(self, chatter_manager: ChatterManager) -> None:
"""设置 Chatter 管理器"""
self.chatter_manager = chatter_manager
logger.debug(f"设置 Chatter 管理器: {chatter_manager.__class__.__name__}")
async def on_message_received(self, stream_id: str) -> None:
"""消息接收时的处理逻辑
Args:
stream_id: 聊天流ID
"""
if not self.is_running:
logger.warning("分发器未运行,忽略消息")
return
try:
# 1. 获取流上下文
context = await self._get_stream_context(stream_id)
if not context:
logger.warning(f"无法获取流上下文: {stream_id}")
return
# 2. 检查是否有活跃的 schedule
has_active_schedule = stream_id in self.stream_schedules
if has_active_schedule:
# 3. 检查打断判定
should_interrupt = await self._check_interruption(stream_id, context)
if should_interrupt:
# 移除旧 schedule 并创建新的
await self._cancel_and_recreate_schedule(stream_id, context)
logger.debug(f"⚡ 打断成功: 流={stream_id[:8]}..., 已重新创建 schedule")
else:
logger.debug(f"打断判定失败,保持原有 schedule: 流={stream_id[:8]}...")
else:
# 4. 创建新的 schedule
await self._create_schedule(stream_id, context)
except Exception as e:
logger.error(f"处理消息接收事件失败 {stream_id}: {e}", exc_info=True)
async def _get_stream_context(self, stream_id: str) -> StreamContext | None:
"""获取流上下文"""
try:
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if chat_stream:
return chat_stream.context_manager.context
return None
except Exception as e:
logger.error(f"获取流上下文失败 {stream_id}: {e}")
return None
async def _check_interruption(self, stream_id: str, context: StreamContext) -> bool:
"""检查是否应该打断当前处理
Args:
stream_id: 流ID
context: 流上下文
Returns:
bool: 是否应该打断
"""
# 检查是否启用打断
if not global_config.chat.interruption_enabled:
return False
# 检查是否正在回复,以及是否允许在回复时打断
if context.is_replying:
if not global_config.chat.allow_reply_interruption:
logger.debug(f"聊天流 {stream_id} 正在回复中,且配置不允许回复时打断")
return False
else:
logger.debug(f"聊天流 {stream_id} 正在回复中,但配置允许回复时打断")
# 只有当 Chatter 真正在处理时才检查打断
if not context.is_chatter_processing:
logger.debug(f"聊天流 {stream_id} Chatter 未在处理,无需打断")
return False
# 检查最后一条消息
last_message = context.get_last_message()
if not last_message:
return False
# 检查是否为表情包消息
if last_message.is_picid or last_message.is_emoji:
logger.info(f"消息 {last_message.message_id} 是表情包或Emoji跳过打断检查")
return False
# 检查触发用户ID
triggering_user_id = context.triggering_user_id
if triggering_user_id and last_message.user_info.user_id != triggering_user_id:
logger.info(f"消息来自非触发用户 {last_message.user_info.user_id},实际触发用户为 {triggering_user_id},跳过打断检查")
return False
# 检查是否已达到最大打断次数
if context.interruption_count >= global_config.chat.interruption_max_limit:
logger.debug(
f"聊天流 {stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit}"
)
return False
# 计算打断概率
interruption_probability = context.calculate_interruption_probability(
global_config.chat.interruption_max_limit
)
# 根据概率决定是否打断
import random
if random.random() < interruption_probability:
logger.debug(f"聊天流 {stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}")
# 增加打断计数
await context.increment_interruption_count()
self.stats["total_interruptions"] += 1
# 检查是否已达到最大次数
if context.interruption_count >= global_config.chat.interruption_max_limit:
logger.warning(
f"聊天流 {stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断"
)
else:
logger.info(
f"聊天流 {stream_id} 已打断,当前打断次数: {context.interruption_count}/{global_config.chat.interruption_max_limit}"
)
return True
else:
logger.debug(f"聊天流 {stream_id} 未触发打断,打断概率: {interruption_probability:.2f}")
return False
async def _cancel_and_recreate_schedule(self, stream_id: str, context: StreamContext) -> None:
"""取消旧的 schedule 并创建新的(打断模式,使用极短延迟)
Args:
stream_id: 流ID
context: 流上下文
"""
# 移除旧的 schedule
old_schedule_id = self.stream_schedules.get(stream_id)
if old_schedule_id:
success = await unified_scheduler.remove_schedule(old_schedule_id)
if success:
logger.debug(f"🔄 已移除旧 schedule 并准备重建: 流={stream_id[:8]}..., ID={old_schedule_id[:8]}...")
self.stats["total_schedules_cancelled"] += 1
else:
logger.warning(f"移除旧 schedule 失败: {stream_id}")
# 从追踪中删除
del self.stream_schedules[stream_id]
# 创建新的 schedule使用即时处理模式极短延迟
await self._create_schedule(stream_id, context, immediate_mode=True)
async def _create_schedule(self, stream_id: str, context: StreamContext, immediate_mode: bool = False) -> None:
"""为聊天流创建新的 schedule
Args:
stream_id: 流ID
context: 流上下文
immediate_mode: 是否使用即时处理模式(打断时使用极短延迟)
"""
try:
# 如果是即时处理模式打断时使用固定的1秒延迟立即重新处理
if immediate_mode:
delay = 1.0 # 硬编码1秒延迟确保打断后能快速重新处理
logger.debug(
f"⚡ 打断模式启用: 流={stream_id[:8]}..., "
f"使用即时延迟={delay:.1f}s 立即重新处理"
)
else:
# 常规模式:计算初始延迟
delay = await self._calculate_initial_delay(stream_id, context)
# 获取未读消息数量用于日志
unread_count = len(context.unread_messages) if context.unread_messages else 0
# 创建 schedule
schedule_id = await unified_scheduler.create_schedule(
callback=self._on_schedule_triggered,
trigger_type=TriggerType.TIME,
trigger_config={"delay_seconds": delay},
is_recurring=False, # 一次性任务,处理完后会创建新的
task_name=f"dispatch_{stream_id[:8]}",
callback_args=(stream_id,),
)
# 追踪 schedule
self.stream_schedules[stream_id] = schedule_id
self.stats["total_schedules_created"] += 1
mode_indicator = "⚡打断" if immediate_mode else "📅常规"
logger.debug(
f"{mode_indicator} 创建 schedule: 流={stream_id[:8]}..., "
f"延迟={delay:.3f}s, 未读={unread_count}, "
f"ID={schedule_id[:8]}..."
)
except Exception as e:
logger.error(f"创建 schedule 失败 {stream_id}: {e}", exc_info=True)
async def _calculate_initial_delay(self, stream_id: str, context: StreamContext) -> float:
"""计算初始延迟时间
Args:
stream_id: 流ID
context: 流上下文
Returns:
float: 延迟时间(秒)
"""
# 基础间隔
base_interval = getattr(global_config.chat, "distribution_interval", 5.0)
# 检查是否有未读消息
unread_count = len(context.unread_messages) if context.unread_messages else 0
# 强制分发阈值
force_dispatch_threshold = getattr(global_config.chat, "force_dispatch_unread_threshold", 20)
# 如果未读消息过多,使用最小间隔
if force_dispatch_threshold and unread_count > force_dispatch_threshold:
min_interval = getattr(global_config.chat, "force_dispatch_min_interval", 0.1)
logger.warning(
f"⚠️ 强制分发触发: 流={stream_id[:8]}..., "
f"未读={unread_count} (阈值={force_dispatch_threshold}), "
f"使用最小间隔={min_interval}s"
)
return min_interval
# 尝试使用能量管理器计算间隔
try:
# 更新能量值
await self._update_stream_energy(stream_id, context)
# 获取当前 focus_energy
focus_energy = energy_manager.energy_cache.get(stream_id, (0.5, 0))[0]
# 使用能量管理器计算间隔
interval = energy_manager.get_distribution_interval(focus_energy)
logger.info(
f"📊 动态间隔计算: 流={stream_id[:8]}..., "
f"能量={focus_energy:.3f}, 间隔={interval:.2f}s"
)
return interval
except Exception as e:
logger.info(
f"📊 使用默认间隔: 流={stream_id[:8]}..., "
f"间隔={base_interval:.2f}s (动态计算失败: {e})"
)
return base_interval
async def _update_stream_energy(self, stream_id: str, context: StreamContext) -> None:
"""更新流的能量值
Args:
stream_id: 流ID
context: 流上下文
"""
try:
from src.chat.message_receive.chat_stream import get_chat_manager
# 获取聊天流
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if not chat_stream:
logger.debug(f"无法找到聊天流 {stream_id},跳过能量更新")
return
# 合并未读消息和历史消息
all_messages = []
# 添加历史消息
history_messages = context.get_history_messages(limit=global_config.chat.max_context_size)
all_messages.extend(history_messages)
# 添加未读消息
unread_messages = context.get_unread_messages()
all_messages.extend(unread_messages)
# 按时间排序并限制数量
all_messages.sort(key=lambda m: m.time)
messages = all_messages[-global_config.chat.max_context_size:]
# 获取用户ID
user_id = context.triggering_user_id
# 使用能量管理器计算并缓存能量值
energy = await energy_manager.calculate_focus_energy(
stream_id=stream_id,
messages=messages,
user_id=user_id
)
# 同步更新到 ChatStream
chat_stream._focus_energy = energy
logger.debug(f"已更新流 {stream_id} 的能量值: {energy:.3f}")
except Exception as e:
logger.warning(f"更新流能量失败 {stream_id}: {e}", exc_info=False)
async def _on_schedule_triggered(self, stream_id: str) -> None:
"""schedule 触发时的回调
Args:
stream_id: 流ID
"""
try:
logger.info(f"⏰ Schedule 触发: 流={stream_id[:8]}..., 开始处理消息")
# 从追踪中移除(因为是一次性任务)
self.stream_schedules.pop(stream_id, None)
# 获取流上下文
context = await self._get_stream_context(stream_id)
if not context:
logger.warning(f"Schedule 触发时无法获取流上下文: {stream_id}")
return
# 检查是否有未读消息
if not context.unread_messages:
logger.debug(f"{stream_id} 没有未读消息,跳过处理")
return
# 激活 chatter 处理
success = await self._process_stream(stream_id, context)
# 更新统计
self.stats["total_process_cycles"] += 1
if not success:
self.stats["total_failures"] += 1
# 处理完成后,创建新的 schedule
await self._create_schedule(stream_id, context)
except Exception as e:
logger.error(f"Schedule 回调执行失败 {stream_id}: {e}", exc_info=True)
async def _process_stream(self, stream_id: str, context: StreamContext) -> bool:
"""处理流消息
Args:
stream_id: 流ID
context: 流上下文
Returns:
bool: 是否处理成功
"""
if not self.chatter_manager:
logger.warning(f"Chatter 管理器未设置: {stream_id}")
return False
# 设置处理状态
self._set_stream_processing_status(stream_id, True)
try:
start_time = time.time()
# 设置触发用户ID
last_message = context.get_last_message()
if last_message:
context.triggering_user_id = last_message.user_info.user_id
# 创建异步任务刷新能量(不阻塞主流程)
energy_task = asyncio.create_task(self._refresh_focus_energy(stream_id))
# 设置 Chatter 正在处理的标志
context.is_chatter_processing = True
logger.debug(f"设置 Chatter 处理标志: {stream_id}")
try:
# 调用 chatter_manager 处理流上下文
results = await self.chatter_manager.process_stream_context(stream_id, context)
success = results.get("success", False)
if success:
process_time = time.time() - start_time
logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)")
else:
logger.warning(f"流处理失败: {stream_id} - {results.get('error_message', '未知错误')}")
return success
finally:
# 清除 Chatter 处理标志
context.is_chatter_processing = False
logger.debug(f"清除 Chatter 处理标志: {stream_id}")
# 等待能量刷新任务完成
try:
await asyncio.wait_for(energy_task, timeout=5.0)
except asyncio.TimeoutError:
logger.warning(f"等待能量刷新超时: {stream_id}")
except Exception as e:
logger.debug(f"能量刷新任务异常: {e}")
except Exception as e:
logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True)
return False
finally:
# 设置处理状态为未处理
self._set_stream_processing_status(stream_id, False)
def _set_stream_processing_status(self, stream_id: str, is_processing: bool) -> None:
"""设置流的处理状态"""
try:
from src.chat.message_manager.message_manager import message_manager
if message_manager.is_running:
message_manager.set_stream_processing_status(stream_id, is_processing)
logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}")
except ImportError:
logger.debug("MessageManager 不可用,跳过状态设置")
except Exception as e:
logger.warning(f"设置流处理状态失败: stream={stream_id}, error={e}")
async def _refresh_focus_energy(self, stream_id: str) -> None:
"""分发完成后刷新能量值"""
try:
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if not chat_stream:
logger.debug(f"刷新能量时未找到聊天流: {stream_id}")
return
await chat_stream.context_manager.refresh_focus_energy_from_history()
logger.debug(f"已刷新聊天流 {stream_id} 的聚焦能量")
except Exception as e:
logger.warning(f"刷新聊天流 {stream_id} 能量失败: {e}")
def get_statistics(self) -> dict[str, Any]:
"""获取统计信息"""
uptime = time.time() - self.stats["start_time"]
return {
"is_running": self.is_running,
"active_schedules": len(self.stream_schedules),
"total_schedules_created": self.stats["total_schedules_created"],
"total_schedules_cancelled": self.stats["total_schedules_cancelled"],
"total_interruptions": self.stats["total_interruptions"],
"total_process_cycles": self.stats["total_process_cycles"],
"total_failures": self.stats["total_failures"],
"uptime": uptime,
}
# 全局实例
scheduler_dispatcher = SchedulerDispatcher()

View File

@@ -204,8 +204,8 @@ class ChatterActionManager:
action_prompt_display=reason,
)
else:
asyncio.create_task( # noqa: RUF006
database_api.store_action_info(
# 改为同步等待,确保存储完成
await database_api.store_action_info(
chat_stream=chat_stream,
action_build_into_prompt=False,
action_prompt_display=reason,
@@ -214,10 +214,9 @@ class ChatterActionManager:
action_data={"reason": reason},
action_name="no_reply",
)
)
# 自动清空所有未读消息
asyncio.create_task(self._clear_all_unread_messages(chat_stream.stream_id, "no_reply")) # noqa: RUF006
# 自动清空所有未读消息(改为同步等待)
await self._clear_all_unread_messages(chat_stream.stream_id, "no_reply")
return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""}
@@ -233,16 +232,14 @@ class ChatterActionManager:
target_message,
)
# 记录执行的动作到目标消息
# 记录执行的动作到目标消息(改为同步等待)
if success:
asyncio.create_task( # noqa: RUF006
self._record_action_to_message(chat_stream, action_name, target_message, action_data)
)
await self._record_action_to_message(chat_stream, action_name, target_message, action_data)
# 自动清空所有未读消息
if clear_unread_messages:
asyncio.create_task(self._clear_all_unread_messages(chat_stream.stream_id, action_name)) # noqa: RUF006
await self._clear_all_unread_messages(chat_stream.stream_id, action_name)
# 重置打断计数
asyncio.create_task(self._reset_interruption_count_after_action(chat_stream.stream_id)) # noqa: RUF006
await self._reset_interruption_count_after_action(chat_stream.stream_id)
return {
"action_type": action_name,
@@ -292,14 +289,14 @@ class ChatterActionManager:
should_quote_reply, # 传递should_quote_reply参数
)
# 记录回复动作到目标消息
asyncio.create_task(self._record_action_to_message(chat_stream, "reply", target_message, action_data)) # noqa: RUF006
# 记录回复动作到目标消息(改为同步等待)
await self._record_action_to_message(chat_stream, "reply", target_message, action_data)
if clear_unread_messages:
asyncio.create_task(self._clear_all_unread_messages(chat_stream.stream_id, "reply")) # noqa: RUF006
await self._clear_all_unread_messages(chat_stream.stream_id, "reply")
# 回复成功,重置打断计数
asyncio.create_task(self._reset_interruption_count_after_action(chat_stream.stream_id)) # noqa: RUF006
# 回复成功,重置打断计数(改为同步等待)
await self._reset_interruption_count_after_action(chat_stream.stream_id)
return {"action_type": "reply", "success": True, "reply_text": reply_text, "loop_info": loop_info}

View File

@@ -81,6 +81,7 @@ class UnifiedScheduler:
self._check_task: asyncio.Task | None = None
self._lock = asyncio.Lock()
self._event_subscriptions: set[str] = set() # 追踪已订阅的事件
self._executing_tasks: dict[str, asyncio.Task] = {} # 追踪正在执行的任务
async def _handle_event_trigger(self, event_name: str | EventType, event_params: dict[str, Any]) -> None:
"""处理来自 event_manager 的事件通知
@@ -182,9 +183,20 @@ class UnifiedScheduler:
except ImportError:
pass
# 取消所有正在执行的任务
executing_tasks = list(self._executing_tasks.values())
if executing_tasks:
logger.debug(f"取消 {len(executing_tasks)} 个正在执行的任务")
for task in executing_tasks:
if not task.done():
task.cancel()
# 等待所有任务取消完成
await asyncio.gather(*executing_tasks, return_exceptions=True)
logger.info("统一调度器已停止")
self._tasks.clear()
self._event_subscriptions.clear()
self._executing_tasks.clear()
async def _check_loop(self):
"""主循环:每秒检查一次所有任务"""
@@ -202,7 +214,7 @@ class UnifiedScheduler:
async def _check_and_trigger_tasks(self):
"""检查并触发到期任务
注意:为了避免死锁,回调执行必须在锁外进行
注意:为了避免死锁和阻塞,回调执行必须在锁外并且并发进行
"""
current_time = datetime.now()
@@ -221,12 +233,56 @@ class UnifiedScheduler:
except Exception as e:
logger.error(f"检查任务 {task.task_name} 时发生错误: {e}", exc_info=True)
# 第二阶段:在锁外执行回调(避免死锁)
tasks_to_remove = []
# 第二阶段:在锁外并发执行所有回调(避免死锁和阻塞
if not tasks_to_trigger:
return
# 为每个任务创建独立的异步任务,确保并发执行
execution_tasks = []
for task in tasks_to_trigger:
execution_task = asyncio.create_task(
self._execute_task_callback(task, current_time),
name=f"execute_{task.task_name}"
)
execution_tasks.append(execution_task)
# 追踪正在执行的任务,以便在 remove_schedule 时可以取消
self._executing_tasks[task.schedule_id] = execution_task
# 等待所有任务完成(使用 return_exceptions=True 避免单个任务失败影响其他任务)
results = await asyncio.gather(*execution_tasks, return_exceptions=True)
# 清理执行追踪
for task in tasks_to_trigger:
self._executing_tasks.pop(task.schedule_id, None)
# 第三阶段:收集需要移除的任务并在锁内移除
tasks_to_remove = []
for task, result in zip(tasks_to_trigger, results):
if isinstance(result, Exception):
logger.error(f"[调度器] 执行任务 {task.task_name} 时发生错误: {result}", exc_info=result)
elif result is True and not task.is_recurring:
# 成功执行且是一次性任务,标记为删除
tasks_to_remove.append(task.schedule_id)
logger.debug(f"[调度器] 一次性任务 {task.task_name} 已完成,将被移除")
if tasks_to_remove:
async with self._lock:
for schedule_id in tasks_to_remove:
await self._remove_task_internal(schedule_id)
async def _execute_task_callback(self, task: ScheduleTask, current_time: datetime) -> bool:
"""执行单个任务的回调(用于并发执行)
Args:
task: 要执行的任务
current_time: 当前时间
Returns:
bool: 执行是否成功
"""
try:
logger.debug(f"[调度器] 触发定时任务: {task.task_name}")
logger.debug(f"[调度器] 触发任务: {task.task_name}")
# 执行回调
await self._execute_callback(task)
@@ -235,19 +291,12 @@ class UnifiedScheduler:
task.last_triggered_at = current_time
task.trigger_count += 1
# 如果不是循环任务,标记为删除
if not task.is_recurring:
tasks_to_remove.append(task.schedule_id)
logger.debug(f"[调度器] 一次性任务 {task.task_name} 已完成,将被移除")
logger.debug(f"[调度器] 任务 {task.task_name} 执行完成")
return True
except Exception as e:
logger.error(f"[调度器] 执行任务 {task.task_name} 时发生错误: {e}", exc_info=True)
# 第三阶段:在锁内移除已完成的任务
if tasks_to_remove:
async with self._lock:
for schedule_id in tasks_to_remove:
await self._remove_task_internal(schedule_id)
return False
async def _should_trigger_task(self, task: ScheduleTask, current_time: datetime) -> bool:
"""判断任务是否应该触发"""
@@ -375,13 +424,25 @@ class UnifiedScheduler:
return schedule_id
async def remove_schedule(self, schedule_id: str) -> bool:
"""移除调度任务"""
"""移除调度任务
如果任务正在执行,会取消执行中的任务
"""
async with self._lock:
if schedule_id not in self._tasks:
logger.warning(f"尝试移除不存在的任务: {schedule_id}")
return False
task = self._tasks[schedule_id]
# 检查是否有正在执行的任务
executing_task = self._executing_tasks.get(schedule_id)
if executing_task and not executing_task.done():
logger.debug(f"取消正在执行的任务: {task.task_name}")
executing_task.cancel()
# 不需要等待,让它在后台取消
self._executing_tasks.pop(schedule_id, None)
await self._remove_task_internal(schedule_id)
logger.debug(f"移除调度任务: {task.task_name}")
return True