diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 46366e800..5bf4abb17 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -302,50 +302,42 @@ class NormalChat: logger.info(f"[{self.stream_name}] 在处理上下文中检测到停止信号,退出") break - # 并行处理兴趣消息 - async def process_single_message(msg_id, message, interest_value, is_mentioned): - """处理单个兴趣消息""" - try: - # 在处理每个消息前检查停止状态 - if self._disabled: - logger.debug(f"[{self.stream_name}] 处理消息时检测到停用,跳过消息 {msg_id}") - return + semaphore = asyncio.Semaphore(5) - # 处理消息 - self.adjust_reply_frequency() + async def process_and_acquire(msg_id, message, interest_value, is_mentioned): + """处理单个兴趣消息并管理信号量""" + async with semaphore: + try: + # 在处理每个消息前检查停止状态 + if self._disabled: + logger.debug(f"[{self.stream_name}] 处理消息时检测到停用,跳过消息 {msg_id}") + return - await self.normal_response( - message=message, - is_mentioned=is_mentioned, - interested_rate=interest_value * self.willing_amplifier, - ) - except asyncio.CancelledError: - logger.debug(f"[{self.stream_name}] 处理消息 {msg_id} 时被取消") - raise # 重新抛出取消异常 - except Exception as e: - logger.error(f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}") - # 不打印完整traceback,避免日志污染 - finally: - # 无论如何都要清理消息 - self.interest_dict.pop(msg_id, None) + # 处理消息 + self.adjust_reply_frequency() - # 创建并行任务列表 - coroutines = [] - for msg_id, (message, interest_value, is_mentioned) in items_to_process: - coroutine = process_single_message(msg_id, message, interest_value, is_mentioned) - coroutines.append(coroutine) + await self.normal_response( + message=message, + is_mentioned=is_mentioned, + interested_rate=interest_value * self.willing_amplifier, + ) + except asyncio.CancelledError: + logger.debug(f"[{self.stream_name}] 处理消息 {msg_id} 时被取消") + raise # 重新抛出取消异常 + except Exception as e: + logger.error(f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}") + # 不打印完整traceback,避免日志污染 + finally: + # 无论如何都要清理消息 + self.interest_dict.pop(msg_id, None) + + tasks = [ + process_and_acquire(msg_id, message, interest_value, is_mentioned) + for msg_id, (message, interest_value, is_mentioned) in items_to_process + ] - # 并行执行所有任务,限制并发数量避免资源过度消耗 - if coroutines: - # 使用信号量控制并发数,最多同时处理5个消息 - semaphore = asyncio.Semaphore(5) - - async def limited_process(coroutine, sem): - async with sem: - await coroutine - - limited_tasks = [limited_process(coroutine, semaphore) for coroutine in coroutines] - await asyncio.gather(*limited_tasks, return_exceptions=True) + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) except asyncio.CancelledError: logger.info(f"[{self.stream_name}] 处理上下文时任务被取消") diff --git a/src/plugins/built_in/core_actions/no_reply.py b/src/plugins/built_in/core_actions/no_reply.py index 160fbb626..c3cf0201c 100644 --- a/src/plugins/built_in/core_actions/no_reply.py +++ b/src/plugins/built_in/core_actions/no_reply.py @@ -355,14 +355,13 @@ class NoReplyAction(BaseAction): last_judge_time = time.time() # 异常时也更新时间,避免频繁重试 # 每10秒输出一次等待状态 - logger.info(f"{self.log_prefix} 开始等待新消息...") if elapsed_time < 60: if int(elapsed_time) % 10 == 0 and int(elapsed_time) > 0: logger.debug(f"{self.log_prefix} 已等待{elapsed_time:.0f}秒,等待新消息...") await asyncio.sleep(1) else: - if int(elapsed_time) % 60 == 0 and int(elapsed_time) > 0: - logger.debug(f"{self.log_prefix} 已等待{elapsed_time / 60:.0f}分钟,等待新消息...") + if int(elapsed_time) % 180 == 0 and int(elapsed_time) > 0: + logger.info(f"{self.log_prefix} 已等待{elapsed_time / 60:.0f}分钟,等待新消息...") await asyncio.sleep(1) # 短暂等待后继续检查