Merge branch 'dev' of https://github.com/MaiM-with-u/MaiBot into dev
This commit is contained in:
@@ -154,14 +154,9 @@ class S4UChat:
|
||||
# 两个消息队列
|
||||
self._vip_queue = asyncio.PriorityQueue()
|
||||
self._normal_queue = asyncio.PriorityQueue()
|
||||
|
||||
# 优先级管理配置
|
||||
self.normal_queue_max_size = 20 # 普通队列最大容量,可以后续移到配置文件
|
||||
self.interest_dict = {} # 用户兴趣分字典,可以后续移到配置文件. e.g. {"user_id": 5.0}
|
||||
self.at_bot_priority_bonus = 100.0 # @机器人时的额外优先分
|
||||
|
||||
self._entry_counter = 0 # 保证FIFO的全局计数器
|
||||
self._new_message_event = asyncio.Event() # 用于唤醒处理器
|
||||
self._new_message_event = asyncio.Event() # 用于唤醒处理器
|
||||
|
||||
self._processing_task = asyncio.create_task(self._message_processor())
|
||||
self._current_generation_task: Optional[asyncio.Task] = None
|
||||
@@ -201,18 +196,17 @@ class S4UChat:
|
||||
async def add_message(self, message: MessageRecv) -> None:
|
||||
"""根据VIP状态和中断逻辑将消息放入相应队列。"""
|
||||
is_vip = self._is_vip(message)
|
||||
# 优先级分数越高,优先级越高。
|
||||
new_priority_score = self._calculate_base_priority_score(message)
|
||||
|
||||
new_priority = self._get_message_priority(message)
|
||||
|
||||
should_interrupt = False
|
||||
if self._current_generation_task and not self._current_generation_task.done():
|
||||
if self._current_message_being_replied:
|
||||
current_queue, current_priority_score, _, current_msg = self._current_message_being_replied
|
||||
|
||||
current_queue, current_priority, _, current_msg = self._current_message_being_replied
|
||||
|
||||
# 规则:VIP从不被打断
|
||||
if current_queue == "vip":
|
||||
pass # Do nothing
|
||||
|
||||
pass # Do nothing
|
||||
|
||||
# 规则:普通消息可以被打断
|
||||
elif current_queue == "normal":
|
||||
# VIP消息可以打断普通消息
|
||||
@@ -231,10 +225,12 @@ class S4UChat:
|
||||
elif new_sender_id == current_sender_id and new_priority_score >= current_priority_score:
|
||||
should_interrupt = True
|
||||
logger.info(f"[{self.stream_name}] Same user sent new message, interrupting.")
|
||||
|
||||
|
||||
if should_interrupt:
|
||||
if self.gpt.partial_response:
|
||||
logger.warning(f"[{self.stream_name}] Interrupting reply. Already generated: '{self.gpt.partial_response}'")
|
||||
logger.warning(
|
||||
f"[{self.stream_name}] Interrupting reply. Already generated: '{self.gpt.partial_response}'"
|
||||
)
|
||||
self._current_generation_task.cancel()
|
||||
|
||||
# asyncio.PriorityQueue 是最小堆,所以我们存入分数的相反数
|
||||
@@ -253,9 +249,9 @@ class S4UChat:
|
||||
return
|
||||
|
||||
await self._normal_queue.put(item)
|
||||
|
||||
|
||||
self._entry_counter += 1
|
||||
self._new_message_event.set() # 唤醒处理器
|
||||
self._new_message_event.set() # 唤醒处理器
|
||||
|
||||
async def _message_processor(self):
|
||||
"""调度器:优先处理VIP队列,然后处理普通队列。"""
|
||||
@@ -276,12 +272,14 @@ class S4UChat:
|
||||
priority = -neg_priority
|
||||
# 检查普通消息是否超时
|
||||
if time.time() - timestamp > self._MESSAGE_TIMEOUT_SECONDS:
|
||||
logger.info(f"[{self.stream_name}] Discarding stale normal message: {message.processed_plain_text[:20]}...")
|
||||
logger.info(
|
||||
f"[{self.stream_name}] Discarding stale normal message: {message.processed_plain_text[:20]}..."
|
||||
)
|
||||
self._normal_queue.task_done()
|
||||
continue # 处理下一条
|
||||
continue # 处理下一条
|
||||
queue_name = "normal"
|
||||
else:
|
||||
continue # 没有消息了,回去等事件
|
||||
continue # 没有消息了,回去等事件
|
||||
|
||||
self._current_message_being_replied = (queue_name, priority, entry_count, message)
|
||||
self._current_generation_task = asyncio.create_task(self._generate_and_send(message))
|
||||
@@ -289,7 +287,9 @@ class S4UChat:
|
||||
try:
|
||||
await self._current_generation_task
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"[{self.stream_name}] Reply generation was interrupted externally for {queue_name} message. The message will be discarded.")
|
||||
logger.info(
|
||||
f"[{self.stream_name}] Reply generation was interrupted externally for {queue_name} message. The message will be discarded."
|
||||
)
|
||||
# 被中断的消息应该被丢弃,而不是重新排队,以响应最新的用户输入。
|
||||
# 旧的重新入队逻辑会导致所有中断的消息最终都被回复。
|
||||
|
||||
@@ -299,11 +299,11 @@ class S4UChat:
|
||||
self._current_generation_task = None
|
||||
self._current_message_being_replied = None
|
||||
# 标记任务完成
|
||||
if queue_name == 'vip':
|
||||
if queue_name == "vip":
|
||||
self._vip_queue.task_done()
|
||||
else:
|
||||
self._normal_queue.task_done()
|
||||
|
||||
|
||||
# 检查是否还有任务,有则立即再次触发事件
|
||||
if not self._vip_queue.empty() or not self._normal_queue.empty():
|
||||
self._new_message_event.set()
|
||||
|
||||
Reference in New Issue
Block a user