feat:为s4u添加优先级信息

This commit is contained in:
SengokuCola
2025-07-13 00:05:12 +08:00
parent 16c2a8b68e
commit 7757931da5
3 changed files with 27 additions and 935 deletions

View File

@@ -0,0 +1,112 @@
import time
import heapq
import math
import json
from typing import List, Optional
from src.common.logger import get_logger
logger = get_logger("normal_chat")
class PrioritizedMessage:
"""带有优先级的消息对象"""
def __init__(self, message_data: dict, interest_scores: List[float], is_vip: bool = False):
self.message_data = message_data
self.arrival_time = time.time()
self.interest_scores = interest_scores
self.is_vip = is_vip
self.priority = self.calculate_priority()
def calculate_priority(self, decay_rate: float = 0.01) -> float:
"""
计算优先级分数。
优先级 = 兴趣分 * exp(-衰减率 * 消息年龄)
"""
age = time.time() - self.arrival_time
decay_factor = math.exp(-decay_rate * age)
priority = sum(self.interest_scores) + decay_factor
return priority
def __lt__(self, other: "PrioritizedMessage") -> bool:
"""用于堆排序的比较函数,我们想要一个最大堆,所以用 >"""
return self.priority > other.priority
class PriorityManager:
"""
管理消息队列,根据优先级选择消息进行处理。
"""
def __init__(self, normal_queue_max_size: int = 5):
self.vip_queue: List[PrioritizedMessage] = [] # VIP 消息队列 (最大堆)
self.normal_queue: List[PrioritizedMessage] = [] # 普通消息队列 (最大堆)
self.normal_queue_max_size = normal_queue_max_size
def add_message(self, message_data: dict, interest_score: Optional[float] = None):
"""
添加新消息到合适的队列中。
"""
user_id = message_data.get("user_id")
priority_info_raw = message_data.get("priority_info")
priority_info = {}
if isinstance(priority_info_raw, str):
priority_info = json.loads(priority_info_raw)
elif isinstance(priority_info_raw, dict):
priority_info = priority_info_raw
is_vip = priority_info.get("message_type") == "vip"
message_priority = priority_info.get("message_priority", 0.0)
p_message = PrioritizedMessage(message_data, [interest_score, message_priority], is_vip)
if is_vip:
heapq.heappush(self.vip_queue, p_message)
logger.debug(f"消息来自VIP用户 {user_id}, 已添加到VIP队列. 当前VIP队列长度: {len(self.vip_queue)}")
else:
if len(self.normal_queue) >= self.normal_queue_max_size:
# 如果队列已满,只在消息优先级高于最低优先级消息时才添加
if p_message.priority > self.normal_queue[0].priority:
heapq.heapreplace(self.normal_queue, p_message)
logger.debug(f"普通队列已满,但新消息优先级更高,已替换. 用户: {user_id}")
else:
logger.debug(f"普通队列已满且新消息优先级较低,已忽略. 用户: {user_id}")
else:
heapq.heappush(self.normal_queue, p_message)
logger.debug(
f"消息来自普通用户 {user_id}, 已添加到普通队列. 当前普通队列长度: {len(self.normal_queue)}"
)
def get_highest_priority_message(self) -> Optional[dict]:
"""
从VIP和普通队列中获取当前最高优先级的消息。
"""
# 更新所有消息的优先级
for p_msg in self.vip_queue:
p_msg.priority = p_msg.calculate_priority()
for p_msg in self.normal_queue:
p_msg.priority = p_msg.calculate_priority()
# 重建堆
heapq.heapify(self.vip_queue)
heapq.heapify(self.normal_queue)
vip_msg = self.vip_queue[0] if self.vip_queue else None
normal_msg = self.normal_queue[0] if self.normal_queue else None
if vip_msg:
return heapq.heappop(self.vip_queue).message_data
elif normal_msg:
return heapq.heappop(self.normal_queue).message_data
else:
return None
def is_empty(self) -> bool:
"""检查所有队列是否为空"""
return not self.vip_queue and not self.normal_queue
def get_queue_status(self) -> str:
"""获取队列状态信息"""
return f"VIP队列: {len(self.vip_queue)}, 普通队列: {len(self.normal_queue)}"

View File

@@ -10,6 +10,7 @@ from src.chat.message_receive.message import MessageSending, MessageRecv
from src.config.config import global_config
from src.common.message.api import get_global_api
from src.chat.message_receive.storage import MessageStorage
import json
logger = get_logger("S4U_chat")
@@ -168,27 +169,40 @@ class S4UChat:
self.normal_queue_max_size = 50 # 普通队列最大容量
logger.info(f"[{self.stream_name}] S4UChat with two-queue system initialized.")
def _is_vip(self, message: MessageRecv) -> bool:
def _get_priority_info(self, message: MessageRecv) -> dict:
"""安全地从消息中提取和解析 priority_info"""
priority_info_raw = message.raw.get("priority_info")
priority_info = {}
if isinstance(priority_info_raw, str):
try:
priority_info = json.loads(priority_info_raw)
except json.JSONDecodeError:
logger.warning(f"Failed to parse priority_info JSON: {priority_info_raw}")
elif isinstance(priority_info_raw, dict):
priority_info = priority_info_raw
return priority_info
def _is_vip(self, priority_info: dict) -> bool:
"""检查消息是否来自VIP用户。"""
# 您需要修改此处或在配置文件中定义VIP用户
vip_user_ids = ["1026294844"]
vip_user_ids = [""]
return message.message_info.user_info.user_id in vip_user_ids
return priority_info.get("message_type") == "vip"
def _get_interest_score(self, user_id: str) -> float:
"""获取用户的兴趣分默认为1.0"""
return self.interest_dict.get(user_id, 1.0)
def _calculate_base_priority_score(self, message: MessageRecv) -> float:
def _calculate_base_priority_score(self, message: MessageRecv, priority_info: dict) -> float:
"""
为消息计算基础优先级分数。分数越高,优先级越高。
"""
score = 0.0
# 如果消息 @ 了机器人,则增加一个很大的分数
if f"@{global_config.bot.nickname}" in message.processed_plain_text or any(
f"@{alias}" in message.processed_plain_text for alias in global_config.bot.alias_names
):
score += self.at_bot_priority_bonus
# if f"@{global_config.bot.nickname}" in message.processed_plain_text or any(
# f"@{alias}" in message.processed_plain_text for alias in global_config.bot.alias_names
# ):
# score += self.at_bot_priority_bonus
# 加上消息自带的优先级
score += priority_info.get("message_priority", 0.0)
# 加上用户的固有兴趣分
score += self._get_interest_score(message.message_info.user_info.user_id)
@@ -196,8 +210,9 @@ class S4UChat:
async def add_message(self, message: MessageRecv) -> None:
"""根据VIP状态和中断逻辑将消息放入相应队列。"""
is_vip = self._is_vip(message)
new_priority_score = self._calculate_base_priority_score(message)
priority_info = self._get_priority_info(message)
is_vip = self._is_vip(priority_info)
new_priority_score = self._calculate_base_priority_score(message, priority_info)
should_interrupt = False
if self._current_generation_task and not self._current_generation_task.done():