feat:添加思考状态发送,优化s4u队列

This commit is contained in:
SengokuCola
2025-07-15 02:53:54 +08:00
parent 807e1e1406
commit 443f0a4f6f
3 changed files with 226 additions and 82 deletions

View File

@@ -12,7 +12,8 @@ from src.common.message.api import get_global_api
from src.chat.message_receive.storage import MessageStorage
from .s4u_watching_manager import watching_manager
import json
from src.person_info.relationship_builder_manager import relationship_builder_manager
from .loading import send_loading, send_unloading
logger = get_logger("S4U_chat")
@@ -28,6 +29,7 @@ class MessageSenderContainer:
self._task: Optional[asyncio.Task] = None
self._paused_event = asyncio.Event()
self._paused_event.set() # 默认设置为非暂停状态
async def add_message(self, chunk: str):
"""向队列中添加一个消息块。"""
@@ -142,7 +144,7 @@ def get_s4u_chat_manager() -> S4UChatManager:
class S4UChat:
_MESSAGE_TIMEOUT_SECONDS = 30 # 普通消息存活时间(秒)
_MESSAGE_TIMEOUT_SECONDS = 120 # 普通消息存活时间(秒)
def __init__(self, chat_stream: ChatStream):
"""初始化 S4UChat 实例。"""
@@ -150,6 +152,7 @@ class S4UChat:
self.chat_stream = chat_stream
self.stream_id = chat_stream.stream_id
self.stream_name = get_chat_manager().get_stream_name(self.stream_id) or self.stream_id
self.relationship_builder = relationship_builder_manager.get_or_create_builder(self.stream_id)
# 两个消息队列
self._vip_queue = asyncio.PriorityQueue()
@@ -167,7 +170,7 @@ class S4UChat:
self.gpt = S4UStreamGenerator()
self.interest_dict: Dict[str, float] = {} # 用户兴趣分
self.at_bot_priority_bonus = 100.0 # @机器人的优先级加成
self.normal_queue_max_size = 5 # 普通队列最大容量
self.recent_message_keep_count = 6 # 保留最近N条消息超出范围的普通消息将被移除
logger.info(f"[{self.stream_name}] S4UChat with two-queue system initialized.")
def _get_priority_info(self, message: MessageRecv) -> dict:
@@ -211,6 +214,9 @@ class S4UChat:
async def add_message(self, message: MessageRecv) -> None:
"""根据VIP状态和中断逻辑将消息放入相应队列。"""
await self.relationship_builder.build_relation()
priority_info = self._get_priority_info(message)
is_vip = self._is_vip(priority_info)
new_priority_score = self._calculate_base_priority_score(message, priority_info)
@@ -258,20 +264,46 @@ class S4UChat:
await self._vip_queue.put(item)
logger.info(f"[{self.stream_name}] VIP message added to queue.")
else:
# 应用普通队列的最大容量限制
if self._normal_queue.qsize() >= self.normal_queue_max_size:
# 队列已满,简单忽略新消息
# 更复杂的逻辑(如替换掉队列中优先级最低的)对于 asyncio.PriorityQueue 来说实现复杂
logger.debug(
f"[{self.stream_name}] Normal queue is full, ignoring new message from {message.message_info.user_info.user_id}"
)
return
await self._normal_queue.put(item)
self._entry_counter += 1
self._new_message_event.set() # 唤醒处理器
def _cleanup_old_normal_messages(self):
"""清理普通队列中不在最近N条消息范围内的消息"""
if self._normal_queue.empty():
return
# 计算阈值:保留最近 recent_message_keep_count 条消息
cutoff_counter = max(0, self._entry_counter - self.recent_message_keep_count)
# 临时存储需要保留的消息
temp_messages = []
removed_count = 0
# 取出所有普通队列中的消息
while not self._normal_queue.empty():
try:
item = self._normal_queue.get_nowait()
neg_priority, entry_count, timestamp, message = item
# 如果消息在最近N条消息范围内保留它
if entry_count >= cutoff_counter:
temp_messages.append(item)
else:
removed_count += 1
self._normal_queue.task_done() # 标记被移除的任务为完成
except asyncio.QueueEmpty:
break
# 将保留的消息重新放入队列
for item in temp_messages:
self._normal_queue.put_nowait(item)
if removed_count > 0:
logger.info(f"[{self.stream_name}] Cleaned up {removed_count} old normal messages outside recent {self.recent_message_keep_count} range.")
async def _message_processor(self):
"""调度器优先处理VIP队列然后处理普通队列。"""
while True:
@@ -279,6 +311,9 @@ class S4UChat:
# 等待有新消息的信号,避免空转
await self._new_message_event.wait()
self._new_message_event.clear()
# 清理普通队列中的过旧消息
self._cleanup_old_normal_messages()
# 优先处理VIP队列
if not self._vip_queue.empty():
@@ -335,46 +370,51 @@ class S4UChat:
await asyncio.sleep(1)
async def _generate_and_send(self, message: MessageRecv):
"""为单个消息生成文本和音频回复。整个过程可以被中断。"""
"""为单个消息生成文本回复。整个过程可以被中断。"""
self._is_replying = True
await send_loading(self.stream_id, "......")
# 视线管理:开始生成回复时切换视线状态
chat_watching = watching_manager.get_watching_by_chat_id(self.stream_id)
await chat_watching.on_reply_start()
# 回复生成实时展示:开始生成
user_name = message.message_info.user_info.user_nickname
sender_container = MessageSenderContainer(self.chat_stream, message)
sender_container.start()
try:
logger.info(f"[S4U] 开始为消息生成文本和音频流: '{message.processed_plain_text[:30]}...'")
# 1. 逐句生成文本、发送并播放音频
# 1. 逐句生成文本、发送
gen = self.gpt.generate_response(message, "")
async for chunk in gen:
# 如果任务被取消await 会在此处引发 CancelledError
# a. 发送文本块
await sender_container.add_message(chunk)
# b. 为该文本块生成并播放音频
# if chunk.strip():
# audio_data = await self.audio_generator.generate(chunk)
# player = MockAudioPlayer(audio_data)
# await player.play()
# 等待所有文本消息发送完成
await sender_container.close()
await sender_container.join()
logger.info(f"[{self.stream_name}] 所有文本和音频块处理完毕。")
logger.info(f"[{self.stream_name}] 所有文本块处理完毕。")
except asyncio.CancelledError:
logger.info(f"[{self.stream_name}] 回复流程(文本或音频)被中断。")
logger.info(f"[{self.stream_name}] 回复流程(文本)被中断。")
raise # 将取消异常向上传播
except Exception as e:
logger.error(f"[{self.stream_name}] 回复生成过程中出现错误: {e}", exc_info=True)
# 回复生成实时展示:清空内容(出错时)
finally:
self._is_replying = False
await send_unloading(self.stream_id)
# 视线管理:回复结束时切换视线状态
chat_watching = watching_manager.get_watching_by_chat_id(self.stream_id)
await chat_watching.on_reply_finished()