fix(chat): 修复消息打断会取消正在进行的回复任务的问题
之前的消息打断逻辑会无差别地取消处理流中的所有任务。这会导致一个问题:当用户在机器人生成回复期间快速发送新消息时,回复任务会被意外中断,导致机器人无法正常完成回复。 本次修改通过引入 `is_replying` 状态来解决此问题: 1. 在 `StreamContext` 中新增 `is_replying` 状态标志,用于追踪回复生成过程。 2. 当开始生成回复时,设置该标志为 `True`,并在回复完成或取消后通过 `finally` 块确保其恢复为 `False`。 3. `MessageManager` 的打断检查逻辑现在会首先检查此标志,如果为 `True` 则跳过打断,从而保护正在进行的回复。 4. `cancel_all_stream_tasks` 也增加了 `exclude_reply` 选项,确保即使触发打断,也不会取消回复任务。
This commit is contained in:
@@ -207,11 +207,12 @@ class ChatterManager:
|
||||
|
||||
return active_tasks
|
||||
|
||||
def cancel_all_stream_tasks(self, stream_id: str) -> int:
|
||||
def cancel_all_stream_tasks(self, stream_id: str, exclude_reply: bool = False) -> int:
|
||||
"""取消指定流的所有处理任务(包括多重回复)
|
||||
|
||||
Args:
|
||||
stream_id: 流ID
|
||||
exclude_reply: 是否排除回复任务
|
||||
|
||||
Returns:
|
||||
int: 成功取消的任务数量
|
||||
@@ -221,10 +222,15 @@ class ChatterManager:
|
||||
|
||||
tasks = self._processing_tasks[stream_id]
|
||||
cancelled_count = 0
|
||||
remaining_tasks = []
|
||||
|
||||
logger.info(f"开始取消流 {stream_id} 的所有处理任务,共 {len(tasks)} 个")
|
||||
logger.info(f"开始取消流 {stream_id} 的处理任务,共 {len(tasks)} 个")
|
||||
|
||||
for task in tasks:
|
||||
if exclude_reply and "reply" in task.get_name().lower():
|
||||
remaining_tasks.append(task)
|
||||
continue
|
||||
|
||||
try:
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
@@ -233,8 +239,12 @@ class ChatterManager:
|
||||
except Exception as e:
|
||||
logger.warning(f"取消任务时出错: {e}")
|
||||
|
||||
# 清理任务记录
|
||||
del self._processing_tasks[stream_id]
|
||||
if remaining_tasks:
|
||||
self._processing_tasks[stream_id] = remaining_tasks
|
||||
else:
|
||||
if stream_id in self._processing_tasks:
|
||||
del self._processing_tasks[stream_id]
|
||||
|
||||
logger.info(f"流 {stream_id} 的任务取消完成,成功取消 {cancelled_count} 个任务")
|
||||
return cancelled_count
|
||||
|
||||
|
||||
@@ -352,11 +352,16 @@ class MessageManager:
|
||||
if not global_config.chat.interruption_enabled or not chat_stream:
|
||||
return
|
||||
|
||||
# 🌟 修复:获取所有处理任务(包括多重回复)
|
||||
# 获取所有处理任务
|
||||
all_processing_tasks = self.chatter_manager.get_all_processing_tasks(chat_stream.stream_id)
|
||||
|
||||
if all_processing_tasks:
|
||||
# 计算打断概率 - 使用新的线性概率模型
|
||||
# 检查是否有回复任务正在进行
|
||||
if chat_stream.context_manager.context.is_replying:
|
||||
logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复,跳过打断检查")
|
||||
return
|
||||
|
||||
# 计算打断概率
|
||||
interruption_probability = chat_stream.context_manager.context.calculate_interruption_probability(
|
||||
global_config.chat.interruption_max_limit
|
||||
)
|
||||
@@ -364,39 +369,28 @@ class MessageManager:
|
||||
# 检查是否已达到最大打断次数
|
||||
if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit:
|
||||
logger.debug(
|
||||
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit},跳过打断检查"
|
||||
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数,跳过打断检查"
|
||||
)
|
||||
return
|
||||
|
||||
# 根据概率决定是否打断
|
||||
if random.random() < interruption_probability:
|
||||
logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断,打断概率: {interruption_probability:.2f},检测到 {len(all_processing_tasks)} 个任务")
|
||||
logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断 (概率: {interruption_probability:.2f})")
|
||||
|
||||
# 🌟 修复:取消所有任务(包括多重回复)
|
||||
cancelled_count = self.chatter_manager.cancel_all_stream_tasks(chat_stream.stream_id)
|
||||
# 取消所有非回复任务
|
||||
cancelled_count = self.chatter_manager.cancel_all_stream_tasks(chat_stream.stream_id, exclude_reply=True)
|
||||
|
||||
if cancelled_count > 0:
|
||||
logger.info(f"消息打断成功取消 {cancelled_count} 个任务: {chat_stream.stream_id}")
|
||||
else:
|
||||
logger.warning(f"消息打断未能取消任何任务: {chat_stream.stream_id}")
|
||||
|
||||
# 增加打断计数
|
||||
await chat_stream.context_manager.context.increment_interruption_count()
|
||||
|
||||
# 🚀 新增:打断后立即重新进入聊天流程
|
||||
# 立即重新处理
|
||||
await self._trigger_immediate_reprocess(chat_stream)
|
||||
|
||||
# 检查是否已达到最大次数
|
||||
if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit:
|
||||
logger.warning(
|
||||
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断"
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f"聊天流 {chat_stream.stream_id} 已打断并重新进入处理流程,当前打断次数: {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit}"
|
||||
)
|
||||
else:
|
||||
logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f},检测到 {len(all_processing_tasks)} 个任务")
|
||||
logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断 (概率: {interruption_probability:.2f})")
|
||||
|
||||
async def _trigger_immediate_reprocess(self, chat_stream: ChatStream):
|
||||
"""打断后立即重新进入聊天流程"""
|
||||
|
||||
@@ -248,6 +248,7 @@ class ChatterActionManager:
|
||||
else:
|
||||
# 生成回复
|
||||
try:
|
||||
chat_stream.context_manager.context.is_replying = True
|
||||
success, response_set, _ = await generator_api.generate_reply(
|
||||
chat_stream=chat_stream,
|
||||
reply_message=target_message,
|
||||
@@ -265,6 +266,8 @@ class ChatterActionManager:
|
||||
except asyncio.CancelledError:
|
||||
logger.debug(f"{log_prefix} 并行执行:回复生成任务已被取消")
|
||||
return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None}
|
||||
finally:
|
||||
chat_stream.context_manager.context.is_replying = False
|
||||
|
||||
# 发送并存储回复
|
||||
loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply(
|
||||
|
||||
@@ -42,6 +42,7 @@ class StreamContext(BaseDataModel):
|
||||
processing_task: asyncio.Task | None = None
|
||||
interruption_count: int = 0 # 打断计数器
|
||||
last_interruption_time: float = 0.0 # 上次打断时间
|
||||
is_replying: bool = False # 是否正在回复
|
||||
|
||||
# 独立分发周期字段
|
||||
next_check_time: float = field(default_factory=time.time) # 下次检查时间
|
||||
|
||||
Reference in New Issue
Block a user