This commit is contained in:
明天好像没什么
2025-11-07 21:01:45 +08:00
parent 80b040da2f
commit c8d7c09625
49 changed files with 854 additions and 872 deletions

View File

@@ -9,8 +9,8 @@ from .scheduler_dispatcher import SchedulerDispatcher, scheduler_dispatcher
__all__ = [
"MessageManager",
"SingleStreamContextManager",
"SchedulerDispatcher",
"SingleStreamContextManager",
"message_manager",
"scheduler_dispatcher",
]

View File

@@ -73,7 +73,7 @@ class SingleStreamContextManager:
cache_enabled = global_config.chat.enable_message_cache
use_cache_system = message_manager.is_running and cache_enabled
if not cache_enabled:
logger.debug(f"消息缓存系统已在配置中禁用")
logger.debug("消息缓存系统已在配置中禁用")
except Exception as e:
logger.debug(f"MessageManager不可用使用直接添加: {e}")
use_cache_system = False
@@ -129,13 +129,13 @@ class SingleStreamContextManager:
await self._calculate_message_interest(message)
self.total_messages += 1
self.last_access_time = time.time()
logger.debug(f"添加消息{message.processed_plain_text}到单流上下文: {self.stream_id}")
return True
# 不应该到达这里,但为了类型检查添加返回值
return True
except Exception as e:
logger.error(f"添加消息到单流上下文失败 {self.stream_id}: {e}", exc_info=True)
return False

View File

@@ -4,13 +4,11 @@
"""
import asyncio
import random
import time
from collections import defaultdict, deque
from typing import TYPE_CHECKING, Any
from src.chat.chatter_manager import ChatterManager
from src.chat.message_receive.chat_stream import ChatStream
from src.chat.planner_actions.action_manager import ChatterActionManager
from src.common.data_models.database_data_model import DatabaseMessages
from src.common.data_models.message_manager_data_model import MessageManagerStats, StreamStats
@@ -77,7 +75,7 @@ class MessageManager:
# 启动基于 scheduler 的消息分发器
await scheduler_dispatcher.start()
scheduler_dispatcher.set_chatter_manager(self.chatter_manager)
# 保留旧的流循环管理器(暂时)以便平滑过渡
# TODO: 在确认新机制稳定后移除
# await stream_loop_manager.start()
@@ -108,7 +106,7 @@ class MessageManager:
# 停止基于 scheduler 的消息分发器
await scheduler_dispatcher.stop()
# 停止旧的流循环管理器(如果启用)
# await stream_loop_manager.stop()
@@ -116,7 +114,7 @@ class MessageManager:
async def add_message(self, stream_id: str, message: DatabaseMessages):
"""添加消息到指定聊天流
新的流程:
1. 检查 notice 消息
2. 将消息添加到上下文(缓存)
@@ -149,10 +147,10 @@ class MessageManager:
if not chat_stream:
logger.warning(f"MessageManager.add_message: 聊天流 {stream_id} 不存在")
return
# 将消息添加到上下文
await chat_stream.context_manager.add_message(message)
# 通知 scheduler_dispatcher 处理消息接收事件
# dispatcher 会检查是否需要打断、创建或更新 schedule
await scheduler_dispatcher.on_message_received(stream_id)

View File

@@ -20,7 +20,7 @@ logger = get_logger("scheduler_dispatcher")
class SchedulerDispatcher:
"""基于 scheduler 的消息分发器
工作流程:
1. 接收消息时,将消息添加到聊天流上下文
2. 检查是否有活跃的 schedule如果没有则创建
@@ -32,13 +32,13 @@ class SchedulerDispatcher:
def __init__(self):
# 追踪每个流的 schedule_id
self.stream_schedules: dict[str, str] = {} # stream_id -> schedule_id
# 用于保护 schedule 创建/删除的锁,避免竞态条件
self.schedule_locks: dict[str, asyncio.Lock] = {} # stream_id -> Lock
# Chatter 管理器
self.chatter_manager: ChatterManager | None = None
# 统计信息
self.stats = {
"total_schedules_created": 0,
@@ -48,9 +48,9 @@ class SchedulerDispatcher:
"total_failures": 0,
"start_time": time.time(),
}
self.is_running = False
logger.info("基于 Scheduler 的消息分发器初始化完成")
async def start(self) -> None:
@@ -58,7 +58,7 @@ class SchedulerDispatcher:
if self.is_running:
logger.warning("分发器已在运行")
return
self.is_running = True
logger.info("基于 Scheduler 的消息分发器已启动")
@@ -66,9 +66,9 @@ class SchedulerDispatcher:
"""停止分发器"""
if not self.is_running:
return
self.is_running = False
# 取消所有活跃的 schedule
schedule_ids = list(self.stream_schedules.values())
for schedule_id in schedule_ids:
@@ -76,7 +76,7 @@ class SchedulerDispatcher:
await unified_scheduler.remove_schedule(schedule_id)
except Exception as e:
logger.error(f"移除 schedule {schedule_id} 失败: {e}")
self.stream_schedules.clear()
logger.info("基于 Scheduler 的消息分发器已停止")
@@ -84,7 +84,7 @@ class SchedulerDispatcher:
"""设置 Chatter 管理器"""
self.chatter_manager = chatter_manager
logger.debug(f"设置 Chatter 管理器: {chatter_manager.__class__.__name__}")
def _get_schedule_lock(self, stream_id: str) -> asyncio.Lock:
"""获取流的 schedule 锁"""
if stream_id not in self.schedule_locks:
@@ -93,40 +93,40 @@ class SchedulerDispatcher:
async def on_message_received(self, stream_id: str) -> None:
"""消息接收时的处理逻辑
Args:
stream_id: 聊天流ID
"""
if not self.is_running:
logger.warning("分发器未运行,忽略消息")
return
try:
# 1. 获取流上下文
context = await self._get_stream_context(stream_id)
if not context:
logger.warning(f"无法获取流上下文: {stream_id}")
return
# 2. 检查是否有活跃的 schedule
has_active_schedule = stream_id in self.stream_schedules
if not has_active_schedule:
# 4. 创建新的 schedule在锁内避免重复创建
await self._create_schedule(stream_id, context)
return
# 3. 检查打断判定
if has_active_schedule:
should_interrupt = await self._check_interruption(stream_id, context)
if should_interrupt:
# 移除旧 schedule 并创建新的(内部有锁保护)
await self._cancel_and_recreate_schedule(stream_id, context)
logger.debug(f"⚡ 打断成功: 流={stream_id[:8]}..., 已重新创建 schedule")
else:
logger.debug(f"打断判定失败,保持原有 schedule: 流={stream_id[:8]}...")
except Exception as e:
logger.error(f"处理消息接收事件失败 {stream_id}: {e}", exc_info=True)
@@ -144,18 +144,18 @@ class SchedulerDispatcher:
async def _check_interruption(self, stream_id: str, context: StreamContext) -> bool:
"""检查是否应该打断当前处理
Args:
stream_id: 流ID
context: 流上下文
Returns:
bool: 是否应该打断
"""
# 检查是否启用打断
if not global_config.chat.interruption_enabled:
return False
# 检查是否正在回复,以及是否允许在回复时打断
if context.is_replying:
if not global_config.chat.allow_reply_interruption:
@@ -163,49 +163,49 @@ class SchedulerDispatcher:
return False
else:
logger.debug(f"聊天流 {stream_id} 正在回复中,但配置允许回复时打断")
# 只有当 Chatter 真正在处理时才检查打断
if not context.is_chatter_processing:
logger.debug(f"聊天流 {stream_id} Chatter 未在处理,无需打断")
return False
# 检查最后一条消息
last_message = context.get_last_message()
if not last_message:
return False
# 检查是否为表情包消息
if last_message.is_picid or last_message.is_emoji:
logger.info(f"消息 {last_message.message_id} 是表情包或Emoji跳过打断检查")
return False
# 检查触发用户ID
triggering_user_id = context.triggering_user_id
if triggering_user_id and last_message.user_info.user_id != triggering_user_id:
logger.info(f"消息来自非触发用户 {last_message.user_info.user_id},实际触发用户为 {triggering_user_id},跳过打断检查")
return False
# 检查是否已达到最大打断次数
if context.interruption_count >= global_config.chat.interruption_max_limit:
logger.debug(
f"聊天流 {stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit}"
)
return False
# 计算打断概率
interruption_probability = context.calculate_interruption_probability(
global_config.chat.interruption_max_limit
)
# 根据概率决定是否打断
import random
if random.random() < interruption_probability:
logger.debug(f"聊天流 {stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}")
# 增加打断计数
await context.increment_interruption_count()
self.stats["total_interruptions"] += 1
# 检查是否已达到最大次数
if context.interruption_count >= global_config.chat.interruption_max_limit:
logger.warning(
@@ -215,7 +215,7 @@ class SchedulerDispatcher:
logger.info(
f"聊天流 {stream_id} 已打断,当前打断次数: {context.interruption_count}/{global_config.chat.interruption_max_limit}"
)
return True
else:
logger.debug(f"聊天流 {stream_id} 未触发打断,打断概率: {interruption_probability:.2f}")
@@ -223,7 +223,7 @@ class SchedulerDispatcher:
async def _cancel_and_recreate_schedule(self, stream_id: str, context: StreamContext) -> None:
"""取消旧的 schedule 并创建新的(打断模式,使用极短延迟)
Args:
stream_id: 流ID
context: 流上下文
@@ -244,13 +244,13 @@ class SchedulerDispatcher:
)
# 移除失败,不创建新 schedule避免重复
return
# 创建新的 schedule使用即时处理模式极短延迟
await self._create_schedule(stream_id, context, immediate_mode=True)
async def _create_schedule(self, stream_id: str, context: StreamContext, immediate_mode: bool = False) -> None:
"""为聊天流创建新的 schedule
Args:
stream_id: 流ID
context: 流上下文
@@ -266,7 +266,7 @@ class SchedulerDispatcher:
)
await unified_scheduler.remove_schedule(old_schedule_id)
del self.stream_schedules[stream_id]
# 如果是即时处理模式打断时使用固定的1秒延迟立即重新处理
if immediate_mode:
delay = 1.0 # 硬编码1秒延迟确保打断后能快速重新处理
@@ -277,10 +277,10 @@ class SchedulerDispatcher:
else:
# 常规模式:计算初始延迟
delay = await self._calculate_initial_delay(stream_id, context)
# 获取未读消息数量用于日志
unread_count = len(context.unread_messages) if context.unread_messages else 0
# 创建 schedule
schedule_id = await unified_scheduler.create_schedule(
callback=self._on_schedule_triggered,
@@ -290,41 +290,41 @@ class SchedulerDispatcher:
task_name=f"dispatch_{stream_id[:8]}",
callback_args=(stream_id,),
)
# 追踪 schedule
self.stream_schedules[stream_id] = schedule_id
self.stats["total_schedules_created"] += 1
mode_indicator = "⚡打断" if immediate_mode else "📅常规"
logger.info(
f"{mode_indicator} 创建 schedule: 流={stream_id[:8]}..., "
f"延迟={delay:.3f}s, 未读={unread_count}, "
f"ID={schedule_id[:8]}..."
)
except Exception as e:
logger.error(f"创建 schedule 失败 {stream_id}: {e}", exc_info=True)
async def _calculate_initial_delay(self, stream_id: str, context: StreamContext) -> float:
"""计算初始延迟时间
Args:
stream_id: 流ID
context: 流上下文
Returns:
float: 延迟时间(秒)
"""
# 基础间隔
base_interval = getattr(global_config.chat, "distribution_interval", 5.0)
# 检查是否有未读消息
unread_count = len(context.unread_messages) if context.unread_messages else 0
# 强制分发阈值
force_dispatch_threshold = getattr(global_config.chat, "force_dispatch_unread_threshold", 20)
# 如果未读消息过多,使用最小间隔
if force_dispatch_threshold and unread_count > force_dispatch_threshold:
min_interval = getattr(global_config.chat, "force_dispatch_min_interval", 0.1)
@@ -334,24 +334,24 @@ class SchedulerDispatcher:
f"使用最小间隔={min_interval}s"
)
return min_interval
# 尝试使用能量管理器计算间隔
try:
# 更新能量值
await self._update_stream_energy(stream_id, context)
# 获取当前 focus_energy
focus_energy = energy_manager.energy_cache.get(stream_id, (0.5, 0))[0]
# 使用能量管理器计算间隔
interval = energy_manager.get_distribution_interval(focus_energy)
logger.info(
f"📊 动态间隔计算: 流={stream_id[:8]}..., "
f"能量={focus_energy:.3f}, 间隔={interval:.2f}s"
)
return interval
except Exception as e:
logger.info(
f"📊 使用默认间隔: 流={stream_id[:8]}..., "
@@ -361,96 +361,96 @@ class SchedulerDispatcher:
async def _update_stream_energy(self, stream_id: str, context: StreamContext) -> None:
"""更新流的能量值
Args:
stream_id: 流ID
context: 流上下文
"""
try:
from src.chat.message_receive.chat_stream import get_chat_manager
# 获取聊天流
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if not chat_stream:
logger.debug(f"无法找到聊天流 {stream_id},跳过能量更新")
return
# 合并未读消息和历史消息
all_messages = []
# 添加历史消息
history_messages = context.get_history_messages(limit=global_config.chat.max_context_size)
all_messages.extend(history_messages)
# 添加未读消息
unread_messages = context.get_unread_messages()
all_messages.extend(unread_messages)
# 按时间排序并限制数量
all_messages.sort(key=lambda m: m.time)
messages = all_messages[-global_config.chat.max_context_size:]
# 获取用户ID
user_id = context.triggering_user_id
# 使用能量管理器计算并缓存能量值
energy = await energy_manager.calculate_focus_energy(
stream_id=stream_id,
messages=messages,
user_id=user_id
)
# 同步更新到 ChatStream
chat_stream._focus_energy = energy
logger.debug(f"已更新流 {stream_id} 的能量值: {energy:.3f}")
except Exception as e:
logger.warning(f"更新流能量失败 {stream_id}: {e}", exc_info=False)
async def _on_schedule_triggered(self, stream_id: str) -> None:
"""schedule 触发时的回调
Args:
stream_id: 流ID
"""
try:
old_schedule_id = self.stream_schedules.get(stream_id)
logger.info(
f"⏰ Schedule 触发: 流={stream_id[:8]}..., "
f"ID={old_schedule_id[:8] if old_schedule_id else 'None'}..., "
f"开始处理消息"
)
# 获取流上下文
context = await self._get_stream_context(stream_id)
if not context:
logger.warning(f"Schedule 触发时无法获取流上下文: {stream_id}")
return
# 检查是否有未读消息
if not context.unread_messages:
logger.debug(f"{stream_id} 没有未读消息,跳过处理")
return
# 激活 chatter 处理(不需要锁,允许并发处理)
success = await self._process_stream(stream_id, context)
# 更新统计
self.stats["total_process_cycles"] += 1
if not success:
self.stats["total_failures"] += 1
self.stream_schedules.pop(stream_id, None)
# 检查缓存中是否有待处理的消息
from src.chat.message_manager.message_manager import message_manager
has_cached = message_manager.has_cached_messages(stream_id)
if has_cached:
# 有缓存消息,立即创建新 schedule 继续处理
logger.info(
@@ -464,60 +464,60 @@ class SchedulerDispatcher:
f"✅ 处理完成且无缓存消息: 流={stream_id[:8]}..., "
f"等待新消息到达"
)
except Exception as e:
logger.error(f"Schedule 回调执行失败 {stream_id}: {e}", exc_info=True)
async def _process_stream(self, stream_id: str, context: StreamContext) -> bool:
"""处理流消息
Args:
stream_id: 流ID
context: 流上下文
Returns:
bool: 是否处理成功
"""
if not self.chatter_manager:
logger.warning(f"Chatter 管理器未设置: {stream_id}")
return False
# 设置处理状态
self._set_stream_processing_status(stream_id, True)
try:
start_time = time.time()
# 设置触发用户ID
last_message = context.get_last_message()
if last_message:
context.triggering_user_id = last_message.user_info.user_id
# 创建异步任务刷新能量(不阻塞主流程)
energy_task = asyncio.create_task(self._refresh_focus_energy(stream_id))
# 设置 Chatter 正在处理的标志
context.is_chatter_processing = True
logger.debug(f"设置 Chatter 处理标志: {stream_id}")
try:
# 调用 chatter_manager 处理流上下文
results = await self.chatter_manager.process_stream_context(stream_id, context)
success = results.get("success", False)
if success:
process_time = time.time() - start_time
logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)")
else:
logger.warning(f"流处理失败: {stream_id} - {results.get('error_message', '未知错误')}")
return success
finally:
# 清除 Chatter 处理标志
context.is_chatter_processing = False
logger.debug(f"清除 Chatter 处理标志: {stream_id}")
# 等待能量刷新任务完成
try:
await asyncio.wait_for(energy_task, timeout=5.0)
@@ -525,11 +525,11 @@ class SchedulerDispatcher:
logger.warning(f"等待能量刷新超时: {stream_id}")
except Exception as e:
logger.debug(f"能量刷新任务异常: {e}")
except Exception as e:
logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True)
return False
finally:
# 设置处理状态为未处理
self._set_stream_processing_status(stream_id, False)
@@ -538,11 +538,11 @@ class SchedulerDispatcher:
"""设置流的处理状态"""
try:
from src.chat.message_manager.message_manager import message_manager
if message_manager.is_running:
message_manager.set_stream_processing_status(stream_id, is_processing)
logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}")
except ImportError:
logger.debug("MessageManager 不可用,跳过状态设置")
except Exception as e:
@@ -556,7 +556,7 @@ class SchedulerDispatcher:
if not chat_stream:
logger.debug(f"刷新能量时未找到聊天流: {stream_id}")
return
await chat_stream.context_manager.refresh_focus_energy_from_history()
logger.debug(f"已刷新聊天流 {stream_id} 的聚焦能量")
except Exception as e: