🤖 自动格式化代码 [skip ci]

This commit is contained in:
github-actions[bot]
2025-07-01 16:22:13 +00:00
parent bb2a95e388
commit b369d12c90
4 changed files with 36 additions and 33 deletions

View File

@@ -154,9 +154,9 @@ class S4UChat:
# 两个消息队列
self._vip_queue = asyncio.PriorityQueue()
self._normal_queue = asyncio.PriorityQueue()
self._entry_counter = 0 # 保证FIFO的全局计数器
self._new_message_event = asyncio.Event() # 用于唤醒处理器
self._new_message_event = asyncio.Event() # 用于唤醒处理器
self._processing_task = asyncio.create_task(self._message_processor())
self._current_generation_task: Optional[asyncio.Task] = None
@@ -186,16 +186,16 @@ class S4UChat:
"""根据VIP状态和中断逻辑将消息放入相应队列。"""
is_vip = self._is_vip(message)
new_priority = self._get_message_priority(message)
should_interrupt = False
if self._current_generation_task and not self._current_generation_task.done():
if self._current_message_being_replied:
current_queue, current_priority, _, current_msg = self._current_message_being_replied
# 规则VIP从不被打断
if current_queue == "vip":
pass # Do nothing
pass # Do nothing
# 规则:普通消息可以被打断
elif current_queue == "normal":
# VIP消息可以打断普通消息
@@ -214,10 +214,12 @@ class S4UChat:
elif new_sender_id == current_sender_id and new_priority <= current_priority:
should_interrupt = True
logger.info(f"[{self.stream_name}] Same user sent new message, interrupting.")
if should_interrupt:
if self.gpt.partial_response:
logger.warning(f"[{self.stream_name}] Interrupting reply. Already generated: '{self.gpt.partial_response}'")
logger.warning(
f"[{self.stream_name}] Interrupting reply. Already generated: '{self.gpt.partial_response}'"
)
self._current_generation_task.cancel()
# 将消息放入对应的队列
@@ -227,9 +229,9 @@ class S4UChat:
logger.info(f"[{self.stream_name}] VIP message added to queue.")
else:
await self._normal_queue.put(item)
self._entry_counter += 1
self._new_message_event.set() # 唤醒处理器
self._new_message_event.set() # 唤醒处理器
async def _message_processor(self):
"""调度器优先处理VIP队列然后处理普通队列。"""
@@ -248,12 +250,14 @@ class S4UChat:
priority, entry_count, timestamp, message = self._normal_queue.get_nowait()
# 检查普通消息是否超时
if time.time() - timestamp > self._MESSAGE_TIMEOUT_SECONDS:
logger.info(f"[{self.stream_name}] Discarding stale normal message: {message.processed_plain_text[:20]}...")
logger.info(
f"[{self.stream_name}] Discarding stale normal message: {message.processed_plain_text[:20]}..."
)
self._normal_queue.task_done()
continue # 处理下一条
continue # 处理下一条
queue_name = "normal"
else:
continue # 没有消息了,回去等事件
continue # 没有消息了,回去等事件
self._current_message_being_replied = (queue_name, priority, entry_count, message)
self._current_generation_task = asyncio.create_task(self._generate_and_send(message))
@@ -261,7 +265,9 @@ class S4UChat:
try:
await self._current_generation_task
except asyncio.CancelledError:
logger.info(f"[{self.stream_name}] Reply generation was interrupted externally for {queue_name} message. The message will be discarded.")
logger.info(
f"[{self.stream_name}] Reply generation was interrupted externally for {queue_name} message. The message will be discarded."
)
# 被中断的消息应该被丢弃,而不是重新排队,以响应最新的用户输入。
# 旧的重新入队逻辑会导致所有中断的消息最终都被回复。
@@ -271,11 +277,11 @@ class S4UChat:
self._current_generation_task = None
self._current_message_being_replied = None
# 标记任务完成
if queue_name == 'vip':
if queue_name == "vip":
self._vip_queue.task_done()
else:
self._normal_queue.task_done()
# 检查是否还有任务,有则立即再次触发事件
if not self._vip_queue.empty() or not self._normal_queue.empty():
self._new_message_event.set()