fix:normal出现未正常进行的一步操作
This commit is contained in:
@@ -302,50 +302,42 @@ class NormalChat:
|
|||||||
logger.info(f"[{self.stream_name}] 在处理上下文中检测到停止信号,退出")
|
logger.info(f"[{self.stream_name}] 在处理上下文中检测到停止信号,退出")
|
||||||
break
|
break
|
||||||
|
|
||||||
# 并行处理兴趣消息
|
semaphore = asyncio.Semaphore(5)
|
||||||
async def process_single_message(msg_id, message, interest_value, is_mentioned):
|
|
||||||
"""处理单个兴趣消息"""
|
|
||||||
try:
|
|
||||||
# 在处理每个消息前检查停止状态
|
|
||||||
if self._disabled:
|
|
||||||
logger.debug(f"[{self.stream_name}] 处理消息时检测到停用,跳过消息 {msg_id}")
|
|
||||||
return
|
|
||||||
|
|
||||||
# 处理消息
|
async def process_and_acquire(msg_id, message, interest_value, is_mentioned):
|
||||||
self.adjust_reply_frequency()
|
"""处理单个兴趣消息并管理信号量"""
|
||||||
|
async with semaphore:
|
||||||
|
try:
|
||||||
|
# 在处理每个消息前检查停止状态
|
||||||
|
if self._disabled:
|
||||||
|
logger.debug(f"[{self.stream_name}] 处理消息时检测到停用,跳过消息 {msg_id}")
|
||||||
|
return
|
||||||
|
|
||||||
await self.normal_response(
|
# 处理消息
|
||||||
message=message,
|
self.adjust_reply_frequency()
|
||||||
is_mentioned=is_mentioned,
|
|
||||||
interested_rate=interest_value * self.willing_amplifier,
|
|
||||||
)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
logger.debug(f"[{self.stream_name}] 处理消息 {msg_id} 时被取消")
|
|
||||||
raise # 重新抛出取消异常
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}")
|
|
||||||
# 不打印完整traceback,避免日志污染
|
|
||||||
finally:
|
|
||||||
# 无论如何都要清理消息
|
|
||||||
self.interest_dict.pop(msg_id, None)
|
|
||||||
|
|
||||||
# 创建并行任务列表
|
await self.normal_response(
|
||||||
coroutines = []
|
message=message,
|
||||||
for msg_id, (message, interest_value, is_mentioned) in items_to_process:
|
is_mentioned=is_mentioned,
|
||||||
coroutine = process_single_message(msg_id, message, interest_value, is_mentioned)
|
interested_rate=interest_value * self.willing_amplifier,
|
||||||
coroutines.append(coroutine)
|
)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.debug(f"[{self.stream_name}] 处理消息 {msg_id} 时被取消")
|
||||||
|
raise # 重新抛出取消异常
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}")
|
||||||
|
# 不打印完整traceback,避免日志污染
|
||||||
|
finally:
|
||||||
|
# 无论如何都要清理消息
|
||||||
|
self.interest_dict.pop(msg_id, None)
|
||||||
|
|
||||||
# 并行执行所有任务,限制并发数量避免资源过度消耗
|
tasks = [
|
||||||
if coroutines:
|
process_and_acquire(msg_id, message, interest_value, is_mentioned)
|
||||||
# 使用信号量控制并发数,最多同时处理5个消息
|
for msg_id, (message, interest_value, is_mentioned) in items_to_process
|
||||||
semaphore = asyncio.Semaphore(5)
|
]
|
||||||
|
|
||||||
async def limited_process(coroutine, sem):
|
if tasks:
|
||||||
async with sem:
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
await coroutine
|
|
||||||
|
|
||||||
limited_tasks = [limited_process(coroutine, semaphore) for coroutine in coroutines]
|
|
||||||
await asyncio.gather(*limited_tasks, return_exceptions=True)
|
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.info(f"[{self.stream_name}] 处理上下文时任务被取消")
|
logger.info(f"[{self.stream_name}] 处理上下文时任务被取消")
|
||||||
|
|||||||
@@ -355,14 +355,13 @@ class NoReplyAction(BaseAction):
|
|||||||
last_judge_time = time.time() # 异常时也更新时间,避免频繁重试
|
last_judge_time = time.time() # 异常时也更新时间,避免频繁重试
|
||||||
|
|
||||||
# 每10秒输出一次等待状态
|
# 每10秒输出一次等待状态
|
||||||
logger.info(f"{self.log_prefix} 开始等待新消息...")
|
|
||||||
if elapsed_time < 60:
|
if elapsed_time < 60:
|
||||||
if int(elapsed_time) % 10 == 0 and int(elapsed_time) > 0:
|
if int(elapsed_time) % 10 == 0 and int(elapsed_time) > 0:
|
||||||
logger.debug(f"{self.log_prefix} 已等待{elapsed_time:.0f}秒,等待新消息...")
|
logger.debug(f"{self.log_prefix} 已等待{elapsed_time:.0f}秒,等待新消息...")
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
else:
|
else:
|
||||||
if int(elapsed_time) % 60 == 0 and int(elapsed_time) > 0:
|
if int(elapsed_time) % 180 == 0 and int(elapsed_time) > 0:
|
||||||
logger.debug(f"{self.log_prefix} 已等待{elapsed_time / 60:.0f}分钟,等待新消息...")
|
logger.info(f"{self.log_prefix} 已等待{elapsed_time / 60:.0f}分钟,等待新消息...")
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
# 短暂等待后继续检查
|
# 短暂等待后继续检查
|
||||||
|
|||||||
Reference in New Issue
Block a user