🤖 自动格式化代码 [skip ci]
This commit is contained in:
@@ -1,39 +1,15 @@
|
||||
import asyncio
|
||||
import time
|
||||
import traceback
|
||||
import random
|
||||
from typing import List, Optional, Dict # 导入类型提示
|
||||
import os
|
||||
import pickle
|
||||
from typing import Optional, Dict # 导入类型提示
|
||||
from maim_message import UserInfo, Seg
|
||||
from src.common.logger import get_logger
|
||||
from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info
|
||||
from src.manager.mood_manager import mood_manager
|
||||
from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager
|
||||
from src.chat.utils.timer_calculator import Timer
|
||||
from src.chat.utils.prompt_builder import global_prompt_manager
|
||||
from .s4u_stream_generator import S4UStreamGenerator
|
||||
from src.chat.message_receive.message import MessageSending, MessageRecv, MessageThinking, MessageSet
|
||||
from src.chat.message_receive.message_sender import message_manager
|
||||
from src.chat.normal_chat.willing.willing_manager import get_willing_manager
|
||||
from src.chat.normal_chat.normal_chat_utils import get_recent_message_stats
|
||||
from src.chat.message_receive.message import MessageSending, MessageRecv
|
||||
from src.config.config import global_config
|
||||
from src.chat.focus_chat.planners.action_manager import ActionManager
|
||||
from src.chat.normal_chat.normal_chat_planner import NormalChatPlanner
|
||||
from src.chat.normal_chat.normal_chat_action_modifier import NormalChatActionModifier
|
||||
from src.chat.normal_chat.normal_chat_expressor import NormalChatExpressor
|
||||
from src.chat.focus_chat.replyer.default_generator import DefaultReplyer
|
||||
from src.person_info.person_info import PersonInfoManager
|
||||
from src.person_info.relationship_manager import get_relationship_manager
|
||||
from src.chat.utils.chat_message_builder import (
|
||||
get_raw_msg_by_timestamp_with_chat,
|
||||
get_raw_msg_by_timestamp_with_chat_inclusive,
|
||||
get_raw_msg_before_timestamp_with_chat,
|
||||
num_new_messages_since,
|
||||
)
|
||||
from src.common.message.api import get_global_api
|
||||
from src.chat.message_receive.storage import MessageStorage
|
||||
from src.audio.mock_audio import MockAudioGenerator, MockAudioPlayer
|
||||
|
||||
|
||||
logger = get_logger("S4U_chat")
|
||||
@@ -41,6 +17,7 @@ logger = get_logger("S4U_chat")
|
||||
|
||||
class MessageSenderContainer:
|
||||
"""一个简单的容器,用于按顺序发送消息并模拟打字效果。"""
|
||||
|
||||
def __init__(self, chat_stream: ChatStream, original_message: MessageRecv):
|
||||
self.chat_stream = chat_stream
|
||||
self.original_message = original_message
|
||||
@@ -71,7 +48,7 @@ class MessageSenderContainer:
|
||||
chars_per_second = 15.0
|
||||
min_delay = 0.2
|
||||
max_delay = 2.0
|
||||
|
||||
|
||||
delay = len(text) / chars_per_second
|
||||
return max(min_delay, min(delay, max_delay))
|
||||
|
||||
@@ -98,7 +75,7 @@ class MessageSenderContainer:
|
||||
|
||||
current_time = time.time()
|
||||
msg_id = f"{current_time}_{random.randint(1000, 9999)}"
|
||||
|
||||
|
||||
text_to_send = chunk
|
||||
if global_config.experimental.debug_show_chat_mode:
|
||||
text_to_send += "ⁿ"
|
||||
@@ -117,19 +94,19 @@ class MessageSenderContainer:
|
||||
reply=self.original_message,
|
||||
is_emoji=False,
|
||||
apply_set_reply_logic=True,
|
||||
reply_to=f"{self.original_message.message_info.user_info.platform}:{self.original_message.message_info.user_info.user_id}"
|
||||
reply_to=f"{self.original_message.message_info.user_info.platform}:{self.original_message.message_info.user_info.user_id}",
|
||||
)
|
||||
|
||||
|
||||
await bot_message.process()
|
||||
|
||||
|
||||
await get_global_api().send_message(bot_message)
|
||||
logger.info(f"已将消息 '{text_to_send}' 发往平台 '{bot_message.message_info.platform}'")
|
||||
|
||||
|
||||
await self.storage.store_message(bot_message, self.chat_stream)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.chat_stream.get_stream_name()}] 消息发送或存储时出现错误: {e}", exc_info=True)
|
||||
|
||||
|
||||
finally:
|
||||
# CRUCIAL: Always call task_done() for any item that was successfully retrieved.
|
||||
self.queue.task_done()
|
||||
@@ -138,7 +115,7 @@ class MessageSenderContainer:
|
||||
"""启动发送任务。"""
|
||||
if self._task is None:
|
||||
self._task = asyncio.create_task(self._send_worker())
|
||||
|
||||
|
||||
async def join(self):
|
||||
"""等待所有消息发送完毕。"""
|
||||
if self._task:
|
||||
@@ -156,8 +133,10 @@ class S4UChatManager:
|
||||
self.s4u_chats[chat_stream.stream_id] = S4UChat(chat_stream)
|
||||
return self.s4u_chats[chat_stream.stream_id]
|
||||
|
||||
|
||||
s4u_chat_manager = S4UChatManager()
|
||||
|
||||
|
||||
def get_s4u_chat_manager() -> S4UChatManager:
|
||||
return s4u_chat_manager
|
||||
|
||||
@@ -169,22 +148,19 @@ class S4UChat:
|
||||
self.chat_stream = chat_stream
|
||||
self.stream_id = chat_stream.stream_id
|
||||
self.stream_name = get_chat_manager().get_stream_name(self.stream_id) or self.stream_id
|
||||
|
||||
|
||||
self._message_queue = asyncio.Queue()
|
||||
self._processing_task = asyncio.create_task(self._message_processor())
|
||||
self._current_generation_task: Optional[asyncio.Task] = None
|
||||
self._current_message_being_replied: Optional[MessageRecv] = None
|
||||
|
||||
|
||||
self._is_replying = False
|
||||
|
||||
self.gpt = S4UStreamGenerator()
|
||||
# self.audio_generator = MockAudioGenerator()
|
||||
|
||||
|
||||
|
||||
logger.info(f"[{self.stream_name}] S4UChat")
|
||||
|
||||
|
||||
# 改为实例方法, 移除 chat 参数
|
||||
async def response(self, message: MessageRecv, is_mentioned: bool, interested_rate: float) -> None:
|
||||
"""将消息放入队列并根据发信人决定是否中断当前处理。"""
|
||||
@@ -226,8 +202,8 @@ class S4UChat:
|
||||
# 如果因快速中断导致队列中积压了更多消息,则只处理最新的一条
|
||||
while not self._message_queue.empty():
|
||||
drained_msg = self._message_queue.get_nowait()
|
||||
self._message_queue.task_done() # 为取出的旧消息调用 task_done
|
||||
message = drained_msg # 始终处理最新消息
|
||||
self._message_queue.task_done() # 为取出的旧消息调用 task_done
|
||||
message = drained_msg # 始终处理最新消息
|
||||
self._current_message_being_replied = message
|
||||
logger.info(f"[{self.stream_name}] 丢弃过时消息,处理最新消息: {message.processed_plain_text}")
|
||||
|
||||
@@ -242,44 +218,40 @@ class S4UChat:
|
||||
finally:
|
||||
self._current_generation_task = None
|
||||
self._current_message_being_replied = None
|
||||
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"[{self.stream_name}] 消息处理器正在关闭。")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.stream_name}] 消息处理器主循环发生未知错误: {e}", exc_info=True)
|
||||
await asyncio.sleep(1) # 避免在未知错误下陷入CPU空转
|
||||
await asyncio.sleep(1) # 避免在未知错误下陷入CPU空转
|
||||
finally:
|
||||
# 确保处理过的消息(无论是正常完成还是被丢弃)都被标记完成
|
||||
if 'message' in locals():
|
||||
if "message" in locals():
|
||||
self._message_queue.task_done()
|
||||
|
||||
|
||||
async def _generate_and_send(self, message: MessageRecv):
|
||||
"""为单个消息生成文本和音频回复。整个过程可以被中断。"""
|
||||
self._is_replying = True
|
||||
sender_container = MessageSenderContainer(self.chat_stream, message)
|
||||
sender_container.start()
|
||||
|
||||
|
||||
try:
|
||||
logger.info(
|
||||
f"[S4U] 开始为消息生成文本和音频流: "
|
||||
f"'{message.processed_plain_text[:30]}...'"
|
||||
)
|
||||
|
||||
logger.info(f"[S4U] 开始为消息生成文本和音频流: '{message.processed_plain_text[:30]}...'")
|
||||
|
||||
# 1. 逐句生成文本、发送并播放音频
|
||||
gen = self.gpt.generate_response(message, "")
|
||||
async for chunk in gen:
|
||||
# 如果任务被取消,await 会在此处引发 CancelledError
|
||||
|
||||
|
||||
# a. 发送文本块
|
||||
await sender_container.add_message(chunk)
|
||||
|
||||
|
||||
# b. 为该文本块生成并播放音频
|
||||
# if chunk.strip():
|
||||
# audio_data = await self.audio_generator.generate(chunk)
|
||||
# player = MockAudioPlayer(audio_data)
|
||||
# await player.play()
|
||||
# audio_data = await self.audio_generator.generate(chunk)
|
||||
# player = MockAudioPlayer(audio_data)
|
||||
# await player.play()
|
||||
|
||||
# 等待所有文本消息发送完成
|
||||
await sender_container.close()
|
||||
@@ -300,20 +272,19 @@ class S4UChat:
|
||||
await sender_container.join()
|
||||
logger.info(f"[{self.stream_name}] _generate_and_send 任务结束,资源已清理。")
|
||||
|
||||
|
||||
async def shutdown(self):
|
||||
"""平滑关闭处理任务。"""
|
||||
logger.info(f"正在关闭 S4UChat: {self.stream_name}")
|
||||
|
||||
|
||||
# 取消正在运行的任务
|
||||
if self._current_generation_task and not self._current_generation_task.done():
|
||||
self._current_generation_task.cancel()
|
||||
|
||||
|
||||
if self._processing_task and not self._processing_task.done():
|
||||
self._processing_task.cancel()
|
||||
|
||||
|
||||
# 等待任务响应取消
|
||||
try:
|
||||
await self._processing_task
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"处理任务已成功取消: {self.stream_name}")
|
||||
logger.info(f"处理任务已成功取消: {self.stream_name}")
|
||||
|
||||
Reference in New Issue
Block a user