Files
Mofox-Core/src/chat/message_receive/message_sender.py

310 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# src/plugins/chat/message_sender.py
import asyncio
import time
from asyncio import Task
from typing import Union
from src.common.message.api import get_global_api
# from ...common.database import db # 数据库依赖似乎不需要了,注释掉
from .message import MessageSending, MessageThinking, MessageSet
from src.chat.message_receive.storage import MessageStorage
from ..utils.utils import truncate_message, calculate_typing_time, count_messages_between
from src.common.logger import get_logger
from rich.traceback import install
install(extra_lines=3)
logger = get_logger("sender")
async def send_via_ws(message: MessageSending) -> None:
"""通过 WebSocket 发送消息"""
try:
await get_global_api().send_message(message)
except Exception as e:
logger.error(f"WS发送失败: {e}")
raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置请检查配置文件") from e
async def send_message(
message: MessageSending,
) -> None:
"""发送消息(核心发送逻辑)"""
# --- 添加计算打字和延迟的逻辑 (从 heartflow_message_sender 移动并调整) ---
typing_time = calculate_typing_time(
input_string=message.processed_plain_text,
thinking_start_time=message.thinking_start_time,
is_emoji=message.is_emoji,
)
# logger.debug(f"{message.processed_plain_text},{typing_time},计算输入时间结束") # 减少日志
await asyncio.sleep(typing_time)
# logger.debug(f"{message.processed_plain_text},{typing_time},等待输入时间结束") # 减少日志
# --- 结束打字延迟 ---
message_preview = truncate_message(message.processed_plain_text)
try:
await send_via_ws(message)
logger.info(f"发送消息 '{message_preview}' 成功") # 调整日志格式
except Exception as e:
logger.error(f"发送消息 '{message_preview}' 失败: {str(e)}")
class MessageSender:
"""发送器 (不再是单例)"""
def __init__(self):
self.message_interval = (0.5, 1) # 消息间隔时间范围(秒)
self.last_send_time = 0
self._current_bot = None
def set_bot(self, bot):
"""设置当前bot实例"""
pass
class MessageContainer:
"""单个聊天流的发送/思考消息容器"""
def __init__(self, chat_id: str, max_size: int = 100):
self.chat_id = chat_id
self.max_size = max_size
self.messages: list[MessageThinking | MessageSending] = [] # 明确类型
self.last_send_time = 0
self.thinking_wait_timeout = 20 # 思考等待超时时间(秒) - 从旧 sender 合并
def count_thinking_messages(self) -> int:
"""计算当前容器中思考消息的数量"""
return sum(1 for msg in self.messages if isinstance(msg, MessageThinking))
def get_timeout_sending_messages(self) -> list[MessageSending]:
"""获取所有超时的MessageSending对象思考时间超过20秒按thinking_start_time排序 - 从旧 sender 合并"""
current_time = time.time()
timeout_messages = []
for msg in self.messages:
# 只检查 MessageSending 类型
if isinstance(msg, MessageSending):
# 确保 thinking_start_time 有效
if msg.thinking_start_time and current_time - msg.thinking_start_time > self.thinking_wait_timeout:
timeout_messages.append(msg)
# 按thinking_start_time排序时间早的在前面
timeout_messages.sort(key=lambda x: x.thinking_start_time)
return timeout_messages
def get_earliest_message(self):
"""获取thinking_start_time最早的消息对象"""
if not self.messages:
return None
earliest_time = float("inf")
earliest_message = None
for msg in self.messages:
# 确保消息有 thinking_start_time 属性
msg_time = getattr(msg, "thinking_start_time", float("inf"))
if msg_time < earliest_time:
earliest_time = msg_time
earliest_message = msg
return earliest_message
def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]):
"""添加消息到队列"""
if isinstance(message, MessageSet):
for single_message in message.messages:
self.messages.append(single_message)
else:
self.messages.append(message)
def remove_message(self, message_to_remove: Union[MessageThinking, MessageSending]):
"""移除指定的消息对象如果消息存在则返回True否则返回False"""
try:
_initial_len = len(self.messages)
# 使用列表推导式或 message_filter 创建新列表,排除要删除的元素
# self.messages = [msg for msg in self.messages if msg is not message_to_remove]
# 或者直接 remove (如果确定对象唯一性)
if message_to_remove in self.messages:
self.messages.remove(message_to_remove)
return True
# logger.debug(f"Removed message {getattr(message_to_remove, 'message_info', {}).get('message_id', 'UNKNOWN')}. Old len: {initial_len}, New len: {len(self.messages)}")
# return len(self.messages) < initial_len
return False
except Exception as e:
logger.exception(f"移除消息时发生错误: {e}")
return False
def has_messages(self) -> bool:
"""检查是否有待发送的消息"""
return bool(self.messages)
def get_all_messages(self) -> list[MessageThinking | MessageSending]:
"""获取所有消息"""
return list(self.messages) # 返回副本
class MessageManager:
"""管理所有聊天流的消息容器 (不再是单例)"""
def __init__(self):
self._processor_task: Task | None = None
self.containers: dict[str, MessageContainer] = {}
self.storage = MessageStorage() # 添加 storage 实例
self._running = True # 处理器运行状态
self._container_lock = asyncio.Lock() # 保护 containers 字典的锁
# self.message_sender = MessageSender() # 创建发送器实例 (改为全局实例)
async def start(self):
"""启动后台处理器任务。"""
# 检查是否已有任务在运行,避免重复启动
if self._processor_task is not None and not self._processor_task.done():
logger.warning("Processor task already running.")
return
self._processor_task = asyncio.create_task(self._start_processor_loop())
logger.debug("MessageManager processor task started.")
def stop(self):
"""停止后台处理器任务。"""
self._running = False
if self._processor_task is not None and not self._processor_task.done():
self._processor_task.cancel()
logger.debug("MessageManager processor task stopping.")
else:
logger.debug("MessageManager processor task not running or already stopped.")
async def get_container(self, chat_id: str) -> MessageContainer:
"""获取或创建聊天流的消息容器 (异步,使用锁)"""
async with self._container_lock:
if chat_id not in self.containers:
self.containers[chat_id] = MessageContainer(chat_id)
return self.containers[chat_id]
async def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]) -> None:
"""添加消息到对应容器"""
chat_stream = message.chat_stream
if not chat_stream:
logger.error("消息缺少 chat_stream无法添加到容器")
return # 或者抛出异常
container = await self.get_container(chat_stream.stream_id)
container.add_message(message)
async def _handle_sending_message(self, container: MessageContainer, message: MessageSending):
"""处理单个 MessageSending 消息 (包含 set_reply 逻辑)"""
try:
_ = message.update_thinking_time() # 更新思考时间
thinking_start_time = message.thinking_start_time
now_time = time.time()
# logger.debug(f"thinking_start_time:{thinking_start_time},now_time:{now_time}")
thinking_messages_count, thinking_messages_length = count_messages_between(
start_time=thinking_start_time, end_time=now_time, stream_id=message.chat_stream.stream_id
)
if (
message.is_head
and (thinking_messages_count > 3 or thinking_messages_length > 200)
and not message.is_private_message()
):
logger.debug(
f"[{message.chat_stream.stream_id}] 应用 set_reply 逻辑: {message.processed_plain_text[:20]}..."
)
message.build_reply()
# --- 结束条件 set_reply ---
await message.process() # 预处理消息内容
# logger.debug(f"{message}")
# 使用全局 message_sender 实例
await send_message(message)
await self.storage.store_message(message, message.chat_stream)
# 移除消息要在发送 *之后*
container.remove_message(message)
# logger.debug(f"[{message.chat_stream.stream_id}] Sent and removed message: {message.message_info.message_id}")
except Exception as e:
logger.error(
f"[{message.chat_stream.stream_id}] 处理发送消息 {getattr(message.message_info, 'message_id', 'N/A')} 时出错: {e}"
)
logger.exception("详细错误信息:")
# 考虑是否移除出错的消息,防止无限循环
removed = container.remove_message(message)
if removed:
logger.warning(f"[{message.chat_stream.stream_id}] 已移除处理出错的消息。")
async def _process_chat_messages(self, chat_id: str):
"""处理单个聊天流消息 (合并后的逻辑)"""
container = await self.get_container(chat_id) # 获取容器是异步的了
if container.has_messages():
message_earliest = container.get_earliest_message()
if not message_earliest: # 如果最早消息为空,则退出
return
if isinstance(message_earliest, MessageThinking):
# --- 处理思考消息 (来自旧 sender) ---
message_earliest.update_thinking_time()
thinking_time = message_earliest.thinking_time
# 减少控制台刷新频率或只在时间显著变化时打印
if int(thinking_time) % 5 == 0: # 每5秒打印一次
print(
f"消息 {message_earliest.message_info.message_id} 正在思考中,已思考 {int(thinking_time)}\r",
end="",
flush=True,
)
elif isinstance(message_earliest, MessageSending):
# --- 处理发送消息 ---
await self._handle_sending_message(container, message_earliest)
# --- 处理超时发送消息 (来自旧 sender) ---
# 在处理完最早的消息后,检查是否有超时的发送消息
timeout_sending_messages = container.get_timeout_sending_messages()
if timeout_sending_messages:
logger.debug(f"[{chat_id}] 发现 {len(timeout_sending_messages)} 条超时的发送消息")
for msg in timeout_sending_messages:
# 确保不是刚刚处理过的最早消息 (虽然理论上应该已被移除,但以防万一)
if msg is message_earliest:
continue
logger.info(f"[{chat_id}] 处理超时发送消息: {msg.message_info.message_id}")
await self._handle_sending_message(container, msg) # 复用处理逻辑
async def _start_processor_loop(self):
"""消息处理器主循环"""
while self._running:
tasks = []
# 使用异步锁保护迭代器创建过程
async with self._container_lock:
# 创建 keys 的快照以安全迭代
chat_ids = list(self.containers.keys())
for chat_id in chat_ids:
# 为每个 chat_id 创建一个处理任务
tasks.append(asyncio.create_task(self._process_chat_messages(chat_id)))
if tasks:
try:
# 等待当前批次的所有任务完成
await asyncio.gather(*tasks)
except Exception as e:
logger.error(f"消息处理循环 gather 出错: {e}")
# 等待一小段时间避免CPU空转
try:
await asyncio.sleep(0.1) # 稍微降低轮询频率
except asyncio.CancelledError:
logger.info("Processor loop sleep cancelled.")
break # 退出循环
logger.info("MessageManager processor loop finished.")
# --- 创建全局实例 ---
message_manager = MessageManager()
message_sender = MessageSender()
# --- 结束全局实例 ---