revert: 回退 commit 94e34c9370

This commit is contained in:
tt-P607
2025-10-11 15:40:15 +08:00
parent 0383a999fb
commit 244b95e0af
4 changed files with 23 additions and 31 deletions

View File

@@ -207,12 +207,11 @@ class ChatterManager:
return active_tasks return active_tasks
def cancel_all_stream_tasks(self, stream_id: str, exclude_reply: bool = False) -> int: def cancel_all_stream_tasks(self, stream_id: str) -> int:
"""取消指定流的所有处理任务(包括多重回复) """取消指定流的所有处理任务(包括多重回复)
Args: Args:
stream_id: 流ID stream_id: 流ID
exclude_reply: 是否排除回复任务
Returns: Returns:
int: 成功取消的任务数量 int: 成功取消的任务数量
@@ -222,15 +221,10 @@ class ChatterManager:
tasks = self._processing_tasks[stream_id] tasks = self._processing_tasks[stream_id]
cancelled_count = 0 cancelled_count = 0
remaining_tasks = []
logger.info(f"开始取消流 {stream_id} 的处理任务,共 {len(tasks)}") logger.info(f"开始取消流 {stream_id}所有处理任务,共 {len(tasks)}")
for task in tasks: for task in tasks:
if exclude_reply and "reply" in task.get_name().lower():
remaining_tasks.append(task)
continue
try: try:
if not task.done(): if not task.done():
task.cancel() task.cancel()
@@ -239,12 +233,8 @@ class ChatterManager:
except Exception as e: except Exception as e:
logger.warning(f"取消任务时出错: {e}") logger.warning(f"取消任务时出错: {e}")
if remaining_tasks: # 清理任务记录
self._processing_tasks[stream_id] = remaining_tasks del self._processing_tasks[stream_id]
else:
if stream_id in self._processing_tasks:
del self._processing_tasks[stream_id]
logger.info(f"{stream_id} 的任务取消完成,成功取消 {cancelled_count} 个任务") logger.info(f"{stream_id} 的任务取消完成,成功取消 {cancelled_count} 个任务")
return cancelled_count return cancelled_count

View File

@@ -352,16 +352,11 @@ class MessageManager:
if not global_config.chat.interruption_enabled or not chat_stream: if not global_config.chat.interruption_enabled or not chat_stream:
return return
# 获取所有处理任务 # 🌟 修复:获取所有处理任务(包括多重回复)
all_processing_tasks = self.chatter_manager.get_all_processing_tasks(chat_stream.stream_id) all_processing_tasks = self.chatter_manager.get_all_processing_tasks(chat_stream.stream_id)
if all_processing_tasks: if all_processing_tasks:
# 检查是否有回复任务正在进行 # 计算打断概率 - 使用新的线性概率模型
if chat_stream.context_manager.context.is_replying:
logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复,跳过打断检查")
return
# 计算打断概率
interruption_probability = chat_stream.context_manager.context.calculate_interruption_probability( interruption_probability = chat_stream.context_manager.context.calculate_interruption_probability(
global_config.chat.interruption_max_limit global_config.chat.interruption_max_limit
) )
@@ -369,28 +364,39 @@ class MessageManager:
# 检查是否已达到最大打断次数 # 检查是否已达到最大打断次数
if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit: if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit:
logger.debug( logger.debug(
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数,跳过打断检查" f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit},跳过打断检查"
) )
return return
# 根据概率决定是否打断 # 根据概率决定是否打断
if random.random() < interruption_probability: if random.random() < interruption_probability:
logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断 (概率: {interruption_probability:.2f})") logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断,打断概率: {interruption_probability:.2f},检测到 {len(all_processing_tasks)} 个任务")
# 取消所有非回复任务 # 🌟 修复:取消所有任务(包括多重回复)
cancelled_count = self.chatter_manager.cancel_all_stream_tasks(chat_stream.stream_id, exclude_reply=True) cancelled_count = self.chatter_manager.cancel_all_stream_tasks(chat_stream.stream_id)
if cancelled_count > 0: if cancelled_count > 0:
logger.info(f"消息打断成功取消 {cancelled_count} 个任务: {chat_stream.stream_id}") logger.info(f"消息打断成功取消 {cancelled_count} 个任务: {chat_stream.stream_id}")
else:
logger.warning(f"消息打断未能取消任何任务: {chat_stream.stream_id}")
# 增加打断计数 # 增加打断计数
await chat_stream.context_manager.context.increment_interruption_count() await chat_stream.context_manager.context.increment_interruption_count()
# 立即重新处理 # 🚀 新增:打断后立即重新进入聊天流程
await self._trigger_immediate_reprocess(chat_stream) await self._trigger_immediate_reprocess(chat_stream)
# 检查是否已达到最大次数
if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit:
logger.warning(
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断"
)
else:
logger.info(
f"聊天流 {chat_stream.stream_id} 已打断并重新进入处理流程,当前打断次数: {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit}"
)
else: else:
logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断 (概率: {interruption_probability:.2f})") logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f},检测到 {len(all_processing_tasks)} 个任务")
async def _trigger_immediate_reprocess(self, chat_stream: ChatStream): async def _trigger_immediate_reprocess(self, chat_stream: ChatStream):
"""打断后立即重新进入聊天流程""" """打断后立即重新进入聊天流程"""

View File

@@ -248,7 +248,6 @@ class ChatterActionManager:
else: else:
# 生成回复 # 生成回复
try: try:
chat_stream.context_manager.context.is_replying = True
success, response_set, _ = await generator_api.generate_reply( success, response_set, _ = await generator_api.generate_reply(
chat_stream=chat_stream, chat_stream=chat_stream,
reply_message=target_message, reply_message=target_message,
@@ -266,8 +265,6 @@ class ChatterActionManager:
except asyncio.CancelledError: except asyncio.CancelledError:
logger.debug(f"{log_prefix} 并行执行:回复生成任务已被取消") logger.debug(f"{log_prefix} 并行执行:回复生成任务已被取消")
return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None}
finally:
chat_stream.context_manager.context.is_replying = False
# 发送并存储回复 # 发送并存储回复
loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply( loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply(

View File

@@ -42,7 +42,6 @@ class StreamContext(BaseDataModel):
processing_task: asyncio.Task | None = None processing_task: asyncio.Task | None = None
interruption_count: int = 0 # 打断计数器 interruption_count: int = 0 # 打断计数器
last_interruption_time: float = 0.0 # 上次打断时间 last_interruption_time: float = 0.0 # 上次打断时间
is_replying: bool = False # 是否正在回复
# 独立分发周期字段 # 独立分发周期字段
next_check_time: float = field(default_factory=time.time) # 下次检查时间 next_check_time: float = field(default_factory=time.time) # 下次检查时间