refactor(chat): 简化任务管理架构,移除多重回复支持

- 移除 ChatterManager 中的复杂任务追踪逻辑(_processing_tasks)
- 将流循环任务管理从 StreamLoopManager 转移到 StreamContext
- 简化消息打断机制,通过取消 stream_loop_task 实现
- 移除多重回复相关功能,统一使用单一任务管理
- 优化错误处理和资源清理逻辑

BREAKING CHANGE: 移除了多重回复功能,所有流处理现在使用单一任务架构
This commit is contained in:
Windpicker-owo
2025-10-27 17:16:36 +08:00
parent 0d3e46fc97
commit a2c0afa75d
4 changed files with 117 additions and 290 deletions

View File

@@ -16,8 +16,6 @@ class ChatterManager:
self.action_manager = action_manager
self.chatter_classes: dict[ChatType, list[type]] = {}
self.instances: dict[str, BaseChatter] = {}
# 🌟 优化:统一任务追踪,支持多重回复
self._processing_tasks: dict[str, list[asyncio.Task]] = {}
# 管理器统计
self.stats = {
@@ -153,22 +151,21 @@ class ChatterManager:
except asyncio.CancelledError:
self.stats["failed_executions"] += 1
logger.info(f"{stream_id} 处理被取消,不清空未读消息")
context.triggering_user_id = None # 清除触发用户ID
raise
except Exception as e:
self.stats["failed_executions"] += 1
logger.error(f"处理流 {stream_id} 时发生错误: {e}")
context.triggering_user_id = None # 清除触发用户ID
raise
finally:
# 无论成功还是失败,都要清理处理任务记录
self.remove_processing_task(stream_id)
context.triggering_user_id = None # 清除触发用户ID
# 清除触发用户ID所有情况下都需要
context.triggering_user_id = None
def get_stats(self) -> dict[str, Any]:
"""获取管理器统计信息"""
stats = self.stats.copy()
stats["active_instances"] = len(self.instances)
stats["registered_chatter_types"] = len(self.chatter_classes)
stats["active_processing_tasks"] = len(self.get_active_processing_tasks())
return stats
def reset_stats(self):
@@ -179,138 +176,3 @@ class ChatterManager:
"successful_executions": 0,
"failed_executions": 0,
}
def set_processing_task(self, stream_id: str, task: asyncio.Task):
"""设置流的主要处理任务"""
if stream_id not in self._processing_tasks:
self._processing_tasks[stream_id] = []
self._processing_tasks[stream_id].insert(0, task) # 主要任务放在第一位
logger.debug(f"设置流 {stream_id} 的主要处理任务")
def get_processing_task(self, stream_id: str) -> asyncio.Task | None:
"""获取流的主要处理任务"""
tasks = self._processing_tasks.get(stream_id, [])
return tasks[0] if tasks and not tasks[0].done() else None
def add_processing_task(self, stream_id: str, task: asyncio.Task):
"""添加处理任务到流(支持多重回复)"""
if stream_id not in self._processing_tasks:
self._processing_tasks[stream_id] = []
self._processing_tasks[stream_id].append(task)
logger.debug(f"添加处理任务到流 {stream_id},当前任务数: {len(self._processing_tasks[stream_id])}")
def get_all_processing_tasks(self, stream_id: str) -> list[asyncio.Task]:
"""获取流的所有活跃处理任务"""
if stream_id not in self._processing_tasks:
return []
# 清理已完成的任务并返回活跃任务
active_tasks = [task for task in self._processing_tasks[stream_id] if not task.done()]
self._processing_tasks[stream_id] = active_tasks
if len(active_tasks) == 0:
del self._processing_tasks[stream_id]
return active_tasks
def cancel_all_stream_tasks(self, stream_id: str) -> int:
"""取消指定流的所有处理任务(包括多重回复)
Args:
stream_id: 流ID
Returns:
int: 成功取消的任务数量
"""
if stream_id not in self._processing_tasks:
return 0
tasks = self._processing_tasks[stream_id]
cancelled_count = 0
logger.info(f"开始取消流 {stream_id} 的所有处理任务,共 {len(tasks)}")
for task in tasks:
try:
if not task.done():
task.cancel()
cancelled_count += 1
logger.debug(f"成功取消任务 {task.get_name() if hasattr(task, 'get_name') else 'unnamed'}")
except Exception as e:
logger.warning(f"取消任务时出错: {e}")
# 清理任务记录
del self._processing_tasks[stream_id]
logger.info(f"{stream_id} 的任务取消完成,成功取消 {cancelled_count} 个任务")
return cancelled_count
def cancel_processing_task(self, stream_id: str) -> bool:
"""取消流的主要处理任务
Args:
stream_id: 流ID
Returns:
bool: 是否成功取消了任务
"""
main_task = self.get_processing_task(stream_id)
if main_task and not main_task.done():
try:
main_task.cancel()
logger.info(f"已取消流 {stream_id} 的主要处理任务")
return True
except Exception as e:
logger.warning(f"取消流 {stream_id} 的主要处理任务时出错: {e}")
return False
return False
def remove_processing_task(self, stream_id: str) -> None:
"""移除流的处理任务记录
Args:
stream_id: 流ID
"""
if stream_id in self._processing_tasks:
del self._processing_tasks[stream_id]
logger.debug(f"已移除流 {stream_id} 的所有处理任务记录")
def get_active_processing_tasks(self) -> dict[str, asyncio.Task]:
"""获取所有活跃的主要处理任务
Returns:
Dict[str, asyncio.Task]: 流ID到主要处理任务的映射
"""
# 过滤掉已完成的任务,只返回主要任务
active_tasks = {}
for stream_id, task_list in list(self._processing_tasks.items()):
if task_list:
main_task = task_list[0] # 获取主要任务
if not main_task.done():
active_tasks[stream_id] = main_task
else:
# 清理已完成的主要任务
task_list = [t for t in task_list if not t.done()]
if task_list:
self._processing_tasks[stream_id] = task_list
active_tasks[stream_id] = task_list[0] # 新的主要任务
else:
del self._processing_tasks[stream_id]
logger.debug(f"清理已完成的处理任务: {stream_id}")
return active_tasks
async def cancel_all_processing_tasks(self) -> int:
"""取消所有活跃的处理任务
Returns:
int: 成功取消的任务数量
"""
active_tasks = self.get_active_processing_tasks()
cancelled_count = 0
for stream_id in active_tasks.keys():
if self.cancel_processing_task(stream_id):
cancelled_count += 1
logger.info(f"已取消 {cancelled_count} 个活跃处理任务")
return cancelled_count

View File

@@ -22,9 +22,6 @@ class StreamLoopManager:
"""流循环管理器 - 每个流一个独立的无限循环任务"""
def __init__(self, max_concurrent_streams: int | None = None):
# 流循环任务管理
self.stream_loops: dict[str, asyncio.Task] = {}
# 统计信息
self.stats: dict[str, Any] = {
"active_streams": 0,
@@ -68,12 +65,19 @@ class StreamLoopManager:
# 取消所有流循环
try:
# 获取所有活跃的流
from src.plugin_system.apis.chat_api import get_chat_manager
chat_manager = get_chat_manager()
all_streams = await chat_manager.get_all_streams()
# 创建任务列表以便并发取消
cancel_tasks = []
for stream_id, task in list(self.stream_loops.items()):
if not task.done():
task.cancel()
cancel_tasks.append((stream_id, task))
for chat_stream in all_streams:
context = chat_stream.context_manager.context
if context.stream_loop_task and not context.stream_loop_task.done():
context.stream_loop_task.cancel()
cancel_tasks.append((chat_stream.stream_id, context.stream_loop_task))
# 并发等待所有任务取消
if cancel_tasks:
@@ -83,15 +87,6 @@ class StreamLoopManager:
return_exceptions=True,
)
# 取消所有活跃的 chatter 处理任务
if self.chatter_manager:
try:
cancelled_count = await self.chatter_manager.cancel_all_processing_tasks()
logger.info(f"已取消 {cancelled_count} 个活跃的 chatter 处理任务")
except Exception as e:
logger.error(f"取消 chatter 处理任务时出错: {e}")
self.stream_loops.clear()
logger.info("所有流循环已清理")
except Exception as e:
logger.error(f"停止管理器时出错: {e}")
@@ -108,8 +103,14 @@ class StreamLoopManager:
Returns:
bool: 是否成功启动
"""
# 获取流上下文
context = await self._get_stream_context(stream_id)
if not context:
logger.warning(f"无法获取流上下文: {stream_id}")
return False
# 快速路径:如果流已存在,无需处理
if stream_id in self.stream_loops:
if context.stream_loop_task and not context.stream_loop_task.done():
logger.debug(f"{stream_id} 循环已在运行")
return True
@@ -141,7 +142,10 @@ class StreamLoopManager:
# 创建流循环任务
try:
loop_task = asyncio.create_task(self._stream_loop_worker(stream_id), name=f"stream_loop_{stream_id}")
self.stream_loops[stream_id] = loop_task
# 将任务记录到 StreamContext 中
context.stream_loop_task = loop_task
# 更新统计信息
self.stats["active_streams"] += 1
self.stats["total_loops"] += 1
@@ -189,12 +193,18 @@ class StreamLoopManager:
Returns:
bool: 是否成功停止
"""
# 快速路径:如果流不存在,无需处理
if stream_id not in self.stream_loops:
logger.debug(f"{stream_id} 循环不存在,无需停止")
# 获取流上下文
context = await self._get_stream_context(stream_id)
if not context:
logger.debug(f"{stream_id} 上下文不存在,无需停止")
return False
task = self.stream_loops[stream_id]
# 检查是否有 stream_loop_task
if not context.stream_loop_task or context.stream_loop_task.done():
logger.debug(f"{stream_id} 循环不存在或已结束,无需停止")
return False
task = context.stream_loop_task
if not task.done():
task.cancel()
try:
@@ -207,14 +217,10 @@ class StreamLoopManager:
except Exception as e:
logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}")
# 取消关联的 chatter 处理任务
if self.chatter_manager:
cancelled = self.chatter_manager.cancel_processing_task(stream_id)
if cancelled:
logger.info(f"已取消关联的 chatter 处理任务: {stream_id}")
del self.stream_loops[stream_id]
logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})")
# 清空 StreamContext 中的任务记录
context.stream_loop_task = None
logger.info(f"停止流循环: {stream_id}")
return True
async def _stream_loop_worker(self, stream_id: str) -> None:
@@ -277,13 +283,6 @@ class StreamLoopManager:
except asyncio.CancelledError:
logger.info(f"流循环被取消: {stream_id}")
if self.chatter_manager:
# 使用 ChatterManager 的新方法取消处理任务
cancelled = self.chatter_manager.cancel_processing_task(stream_id)
if cancelled:
logger.info(f"成功取消 chatter 处理任务: {stream_id}")
else:
logger.debug(f"没有需要取消的 chatter 处理任务: {stream_id}")
break
except Exception as e:
logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True)
@@ -291,10 +290,14 @@ class StreamLoopManager:
await asyncio.sleep(5.0) # 错误时等待5秒再重试
finally:
# 清理循环标记
if stream_id in self.stream_loops:
del self.stream_loops[stream_id]
logger.debug(f"清理流循环标记: {stream_id}")
# 清理 StreamContext 中的任务记录
try:
context = await self._get_stream_context(stream_id)
if context and context.stream_loop_task:
context.stream_loop_task = None
logger.debug(f"清理 StreamContext 中的流循环任务: {stream_id}")
except Exception as e:
logger.debug(f"清理 StreamContext 任务记录失败: {e}")
# 释放自适应管理器的槽位
try:
@@ -384,9 +387,7 @@ class StreamLoopManager:
energy_task.add_done_callback(lambda t: child_tasks.discard(t))
# 直接调用chatter_manager处理流上下文
task = asyncio.create_task(self.chatter_manager.process_stream_context(stream_id, context))
self.chatter_manager.set_processing_task(stream_id, task)
results = await task
results = await self.chatter_manager.process_stream_context(stream_id, context)
success = results.get("success", False)
if success:
@@ -509,8 +510,11 @@ class StreamLoopManager:
current_time = time.time()
uptime = current_time - self.stats["start_time"] if self.is_running else 0
# 从统计信息中获取活跃流数量
active_streams = self.stats.get("active_streams", 0)
return {
"active_streams": len(self.stream_loops),
"active_streams": active_streams,
"total_loops": self.stats["total_loops"],
"max_concurrent": self.max_concurrent_streams,
"is_running": self.is_running,
@@ -573,9 +577,12 @@ class StreamLoopManager:
# 计算吞吐量
throughput = self.stats["total_process_cycles"] / max(1, uptime / 3600) # 每小时处理次数
# 从统计信息中获取活跃流数量
active_streams = self.stats.get("active_streams", 0)
return {
"uptime_hours": uptime / 3600,
"active_streams": len(self.stream_loops),
"active_streams": active_streams,
"total_process_cycles": self.stats["total_process_cycles"],
"total_failures": self.stats["total_failures"],
"throughput_per_hour": throughput,
@@ -627,48 +634,39 @@ class StreamLoopManager:
logger.info(f"强制分发流处理: {stream_id}")
try:
# 检查是否有现有的分发循环
if stream_id in self.stream_loops:
logger.info(f"发现现有流循环 {stream_id},将先移除再重新创建")
existing_task = self.stream_loops[stream_id]
if not existing_task.done():
existing_task.cancel()
# 创建异步任务来等待取消完成,并添加异常处理
cancel_task = asyncio.create_task(
self._wait_for_task_cancel(stream_id, existing_task), name=f"cancel_existing_loop_{stream_id}"
)
# 为取消任务添加异常处理,避免孤儿任务
cancel_task.add_done_callback(
lambda task: logger.debug(f"取消任务完成: {stream_id}")
if not task.exception()
else logger.error(f"取消任务异常: {stream_id} - {task.exception()}")
)
# 从字典中移除
del self.stream_loops[stream_id]
# 获取聊天管理器和流
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if not chat_stream:
logger.warning(f"强制分发时未找到流: {stream_id}")
return
# 获取流上下文
context = chat_stream.context_manager.context
context = await self._get_stream_context(stream_id)
if not context:
logger.warning(f"强制分发时未找到流上下文: {stream_id}")
return
# 检查是否有现有的 stream_loop_task
if context.stream_loop_task and not context.stream_loop_task.done():
logger.info(f"发现现有流循环 {stream_id},将先取消再重新创建")
existing_task = context.stream_loop_task
existing_task.cancel()
# 创建异步任务来等待取消完成,并添加异常处理
cancel_task = asyncio.create_task(
self._wait_for_task_cancel(stream_id, existing_task), name=f"cancel_existing_loop_{stream_id}"
)
# 为取消任务添加异常处理,避免孤儿任务
cancel_task.add_done_callback(
lambda task: logger.debug(f"取消任务完成: {stream_id}")
if not task.exception()
else logger.error(f"取消任务异常: {stream_id} - {task.exception()}")
)
# 检查未读消息数量
unread_count = self._get_unread_count(context)
logger.info(f"{stream_id} 当前未读消息数: {unread_count}")
# 创建新的流循环任务
new_task = asyncio.create_task(self._stream_loop(stream_id), name=f"force_stream_loop_{stream_id}")
self.stream_loops[stream_id] = new_task
self.stats["total_loops"] += 1
logger.info(f"已创建强制分发流循环: {stream_id} (当前总数: {len(self.stream_loops)})")
# 使用 start_stream_loop 重新创建流循环任务
success = await self.start_stream_loop(stream_id, force=True)
if success:
logger.info(f"已创建强制分发流循环: {stream_id}")
else:
logger.warning(f"创建强制分发流循环失败: {stream_id}")
except Exception as e:
logger.error(f"强制分发流处理失败 {stream_id}: {e}", exc_info=True)

View File

@@ -360,7 +360,7 @@ class MessageManager:
logger.error(f"清理不活跃聊天流时发生错误: {e}")
async def _check_and_handle_interruption(self, chat_stream: ChatStream | None = None, message: DatabaseMessages | None = None):
"""检查并处理消息打断 - 支持多重回复任务取消"""
"""检查并处理消息打断 - 通过取消 stream_loop_task 实现"""
if not global_config.chat.interruption_enabled or not chat_stream or not message:
return
@@ -374,70 +374,64 @@ class MessageManager:
logger.info(f"消息 {message.message_id} 是表情包或Emoji跳过打断检查")
return
# 修复:获取所有处理任务(包括多重回复)
all_processing_tasks = self.chatter_manager.get_all_processing_tasks(chat_stream.stream_id)
if all_processing_tasks:
# 检查是否有 stream_loop_task 在运行
context = chat_stream.context_manager.context
stream_loop_task = context.stream_loop_task
if stream_loop_task and not stream_loop_task.done():
# 检查触发用户ID
triggering_user_id = chat_stream.context_manager.context.triggering_user_id
triggering_user_id = context.triggering_user_id
if triggering_user_id and message.user_info.user_id != triggering_user_id:
logger.info(f"消息来自非触发用户 {message.user_info.user_id},实际触发用户为 {triggering_user_id},跳过打断检查")
return
# 计算打断概率 - 使用新的线性概率模型
interruption_probability = chat_stream.context_manager.context.calculate_interruption_probability(
# 计算打断概率
interruption_probability = context.calculate_interruption_probability(
global_config.chat.interruption_max_limit
)
# 检查是否已达到最大打断次数
if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit:
if context.interruption_count >= global_config.chat.interruption_max_limit:
logger.debug(
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit},跳过打断检查"
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},跳过打断检查"
)
return
# 根据概率决定是否打断
if random.random() < interruption_probability:
logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断,打断概率: {interruption_probability:.2f},检测到 {len(all_processing_tasks)} 个任务")
logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}")
# 修复:取消所有任务(包括多重回复)
cancelled_count = self.chatter_manager.cancel_all_stream_tasks(chat_stream.stream_id)
if cancelled_count > 0:
logger.info(f"消息打断成功取消 {cancelled_count} 个任务: {chat_stream.stream_id}")
else:
logger.warning(f"消息打断未能取消任何任务: {chat_stream.stream_id}")
# 取消 stream_loop_task子任务会通过 try-catch 自动取消
try:
stream_loop_task.cancel()
logger.info(f"已取消流循环任务: {chat_stream.stream_id}")
except Exception as e:
logger.warning(f"取消流循环任务失败: {chat_stream.stream_id} - {e}")
# 增加打断计数
await chat_stream.context_manager.context.increment_interruption_count()
await context.increment_interruption_count()
# 新增:打断后立即重新进入聊天流程
# 新增:打断后延迟重新进入聊天流程,以合并短时间内的多条消息
asyncio.create_task(self._trigger_delayed_reprocess(chat_stream, delay=0.5))
# 打断后重新创建 stream_loop 任务
await self._trigger_reprocess(chat_stream)
# 检查是否已达到最大次数
if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit:
if context.interruption_count >= global_config.chat.interruption_max_limit:
logger.warning(
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断"
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断"
)
else:
logger.info(
f"聊天流 {chat_stream.stream_id} 已打断并重新进入处理流程,当前打断次数: {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit}"
f"聊天流 {chat_stream.stream_id} 已打断并重新进入处理流程,当前打断次数: {context.interruption_count}/{global_config.chat.interruption_max_limit}"
)
else:
logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f},检测到 {len(all_processing_tasks)} 个任务")
async def _trigger_delayed_reprocess(self, chat_stream: ChatStream, delay: float):
"""打断后延迟重新进入聊天流程,以合并短时间内的多条消息"""
await asyncio.sleep(delay)
await self._trigger_reprocess(chat_stream)
logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f}")
async def _trigger_reprocess(self, chat_stream: ChatStream):
"""重新处理聊天流的核心逻辑 - 支持子任务管理"""
"""重新处理聊天流的核心逻辑 - 重新创建 stream_loop 任务"""
try:
stream_id = chat_stream.stream_id
logger.info(f"🚀 打断后立即重新处理聊天流: {stream_id}")
logger.info(f"🚀 打断后重新创建流循环任务: {stream_id}")
# 等待一小段时间确保当前消息已经添加到未读消息中
await asyncio.sleep(0.1)
@@ -451,47 +445,19 @@ class MessageManager:
logger.debug(f"💭 聊天流 {stream_id} 没有未读消息,跳过重新处理")
return
logger.info(f"💬 开始重新处理 {len(unread_messages)} 条未读消息: {stream_id}")
logger.info(f"💬 准备重新处理 {len(unread_messages)} 条未读消息: {stream_id}")
# 创建处理任务并使用try-catch实现子任务管理
task = asyncio.create_task(
self._managed_reprocess_with_cleanup(stream_id, context),
name=f"reprocess_{stream_id}_{int(time.time())}"
)
# 设置处理任务
self.chatter_manager.set_processing_task(stream_id, task)
# 不等待完成,让它异步执行
# 如果需要等待,调用者会等待 chatter_manager.process_stream_context
# 重新创建 stream_loop 任务
success = await stream_loop_manager.start_stream_loop(stream_id, force=True)
if success:
logger.info(f"✅ 成功重新创建流循环任务: {stream_id}")
else:
logger.warning(f"⚠️ 重新创建流循环任务失败: {stream_id}")
except Exception as e:
logger.error(f"🚨 触发重新处理时出错: {e}")
async def _managed_reprocess_with_cleanup(self, stream_id: str, context):
"""带清理功能的重新处理"""
child_tasks = set() # 跟踪子任务
try:
# 处理流上下文
result = await self.chatter_manager.process_stream_context(stream_id, context)
return result
except asyncio.CancelledError:
logger.info(f"重新处理任务被取消: {stream_id}")
# 取消所有子任务
for child_task in child_tasks:
if not child_task.done():
child_task.cancel()
raise
except Exception as e:
logger.error(f"重新处理任务执行出错: {stream_id} - {e}")
# 清理子任务
for child_task in child_tasks:
if not child_task.done():
child_task.cancel()
raise
async def clear_all_unread_messages(self, stream_id: str):
"""清除指定上下文中的所有未读消息,在消息处理完成后调用"""
try:

View File

@@ -40,6 +40,7 @@ class StreamContext(BaseDataModel):
last_check_time: float = field(default_factory=time.time)
is_active: bool = True
processing_task: asyncio.Task | None = None
stream_loop_task: asyncio.Task | None = None # 流循环任务
interruption_count: int = 0 # 打断计数器
last_interruption_time: float = 0.0 # 上次打断时间