Merge branch 'dev' of https://github.com/Dax233/MaiMBot into issue#814
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
from .emoji_manager import emoji_manager
|
||||
from ..person_info.relationship_manager import relationship_manager
|
||||
from .chat_stream import chat_manager
|
||||
from .messagesender import message_manager
|
||||
from .message_sender import message_manager
|
||||
from ..storage.storage import MessageStorage
|
||||
|
||||
|
||||
|
||||
@@ -3,11 +3,10 @@ from ...config.config import global_config
|
||||
from .message import MessageRecv
|
||||
from ..PFC.pfc_manager import PFCManager
|
||||
from .chat_stream import chat_manager
|
||||
from ..chat_module.only_process.only_message_process import MessageProcessor
|
||||
from .only_message_process import MessageProcessor
|
||||
|
||||
from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig
|
||||
from ..chat_module.reasoning_chat.reasoning_chat import ReasoningChat
|
||||
from ..chat_module.heartFC_chat.heartFC_processor import HeartFCProcessor
|
||||
from ..heartFC_chat.heartflow_processor import HeartFCProcessor
|
||||
from ..utils.prompt_builder import Prompt, global_prompt_manager
|
||||
import traceback
|
||||
|
||||
@@ -27,8 +26,7 @@ class ChatBot:
|
||||
self.bot = None # bot 实例引用
|
||||
self._started = False
|
||||
self.mood_manager = MoodManager.get_instance() # 获取情绪管理器单例
|
||||
self.reasoning_chat = ReasoningChat()
|
||||
self.heartFC_processor = HeartFCProcessor() # 新增
|
||||
self.heartflow_processor = HeartFCProcessor() # 新增
|
||||
|
||||
# 创建初始化PFC管理器的任务,会在_ensure_started时执行
|
||||
self.only_process_chat = MessageProcessor()
|
||||
@@ -53,18 +51,10 @@ class ChatBot:
|
||||
|
||||
async def message_process(self, message_data: str) -> None:
|
||||
"""处理转化后的统一格式消息
|
||||
根据global_config.response_mode选择不同的回复模式:
|
||||
1. heart_flow模式:使用思维流系统进行回复
|
||||
- 包含思维流状态管理
|
||||
- 在回复前进行观察和状态更新
|
||||
- 回复后更新思维流状态
|
||||
|
||||
2. reasoning模式:使用推理系统进行回复
|
||||
- 直接使用意愿管理器计算回复概率
|
||||
- 没有思维流相关的状态管理
|
||||
- 更简单直接的回复逻辑
|
||||
|
||||
所有模式都包含:
|
||||
heart_flow模式:使用思维流系统进行回复
|
||||
- 包含思维流状态管理
|
||||
- 在回复前进行观察和状态更新
|
||||
- 回复后更新思维流状态
|
||||
- 消息过滤
|
||||
- 记忆激活
|
||||
- 意愿计算
|
||||
@@ -92,6 +82,10 @@ class ChatBot:
|
||||
logger.debug(f"用户{userinfo.user_id}被禁止回复")
|
||||
return
|
||||
|
||||
if groupinfo.group_id not in global_config.talk_allowed_groups:
|
||||
logger.debug(f"群{groupinfo.group_id}被禁止回复")
|
||||
return
|
||||
|
||||
if message.message_info.template_info and not message.message_info.template_info.template_default:
|
||||
template_group_name = message.message_info.template_info.template_name
|
||||
template_items = message.message_info.template_info.template_items
|
||||
@@ -119,9 +113,9 @@ class ChatBot:
|
||||
await self.only_process_chat.process_message(message)
|
||||
await self._create_pfc_chat(message)
|
||||
else:
|
||||
await self.heartFC_processor.process_message(message_data)
|
||||
await self.heartflow_processor.process_message(message_data)
|
||||
else:
|
||||
await self.heartFC_processor.process_message(message_data)
|
||||
await self.heartflow_processor.process_message(message_data)
|
||||
|
||||
if template_group_name:
|
||||
async with global_prompt_manager.async_message_scope(template_group_name):
|
||||
|
||||
@@ -14,9 +14,14 @@ from ...config.config import global_config
|
||||
from ..chat.utils import get_embedding
|
||||
from ..chat.utils_image import ImageManager, image_path_to_base64
|
||||
from ..models.utils_model import LLMRequest
|
||||
from src.common.logger import get_module_logger
|
||||
from src.common.logger import get_module_logger, LogConfig, EMOJI_STYLE_CONFIG
|
||||
|
||||
logger = get_module_logger("emoji")
|
||||
emoji_log_config = LogConfig(
|
||||
console_format=EMOJI_STYLE_CONFIG["console_format"],
|
||||
file_format=EMOJI_STYLE_CONFIG["file_format"],
|
||||
)
|
||||
|
||||
logger = get_module_logger("emoji", config=emoji_log_config)
|
||||
|
||||
|
||||
image_manager = ImageManager()
|
||||
|
||||
@@ -290,6 +290,7 @@ class MessageSending(MessageProcessBase):
|
||||
is_head: bool = False,
|
||||
is_emoji: bool = False,
|
||||
thinking_start_time: float = 0,
|
||||
apply_set_reply_logic: bool = False,
|
||||
):
|
||||
# 调用父类初始化
|
||||
super().__init__(
|
||||
@@ -306,6 +307,7 @@ class MessageSending(MessageProcessBase):
|
||||
self.reply_to_message_id = reply.message_info.message_id if reply else None
|
||||
self.is_head = is_head
|
||||
self.is_emoji = is_emoji
|
||||
self.apply_set_reply_logic = apply_set_reply_logic
|
||||
|
||||
def set_reply(self, reply: Optional["MessageRecv"] = None) -> None:
|
||||
"""设置回复消息"""
|
||||
|
||||
348
src/plugins/chat/message_sender.py
Normal file
348
src/plugins/chat/message_sender.py
Normal file
@@ -0,0 +1,348 @@
|
||||
# src/plugins/chat/message_sender.py
|
||||
import asyncio
|
||||
import time
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
from src.common.logger import get_module_logger
|
||||
|
||||
# from ...common.database import db # 数据库依赖似乎不需要了,注释掉
|
||||
from ..message.api import global_api
|
||||
from .message import MessageSending, MessageThinking, MessageSet
|
||||
|
||||
from ..storage.storage import MessageStorage
|
||||
from ...config.config import global_config
|
||||
from .utils import truncate_message, calculate_typing_time, count_messages_between
|
||||
|
||||
from src.common.logger import LogConfig, SENDER_STYLE_CONFIG
|
||||
|
||||
# 定义日志配置
|
||||
sender_config = LogConfig(
|
||||
# 使用消息发送专用样式
|
||||
console_format=SENDER_STYLE_CONFIG["console_format"],
|
||||
file_format=SENDER_STYLE_CONFIG["file_format"],
|
||||
)
|
||||
|
||||
logger = get_module_logger("msg_sender", config=sender_config)
|
||||
|
||||
|
||||
class MessageSender:
|
||||
"""发送器 (不再是单例)"""
|
||||
|
||||
def __init__(self):
|
||||
self.message_interval = (0.5, 1) # 消息间隔时间范围(秒)
|
||||
self.last_send_time = 0
|
||||
self._current_bot = None
|
||||
|
||||
def set_bot(self, bot):
|
||||
"""设置当前bot实例"""
|
||||
pass
|
||||
|
||||
async def send_via_ws(self, message: MessageSending) -> None:
|
||||
"""通过 WebSocket 发送消息"""
|
||||
try:
|
||||
await global_api.send_message(message)
|
||||
except Exception as e:
|
||||
logger.error(f"WS发送失败: {e}")
|
||||
raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件") from e
|
||||
|
||||
async def send_message(
|
||||
self,
|
||||
message: MessageSending,
|
||||
) -> None:
|
||||
"""发送消息(核心发送逻辑)"""
|
||||
|
||||
# --- 添加计算打字和延迟的逻辑 (从 heartflow_message_sender 移动并调整) ---
|
||||
typing_time = calculate_typing_time(
|
||||
input_string=message.processed_plain_text,
|
||||
thinking_start_time=message.thinking_start_time,
|
||||
is_emoji=message.is_emoji,
|
||||
)
|
||||
# logger.trace(f"{message.processed_plain_text},{typing_time},计算输入时间结束") # 减少日志
|
||||
await asyncio.sleep(typing_time)
|
||||
# logger.trace(f"{message.processed_plain_text},{typing_time},等待输入时间结束") # 减少日志
|
||||
# --- 结束打字延迟 ---
|
||||
|
||||
message_json = message.to_dict()
|
||||
message_preview = truncate_message(message.processed_plain_text)
|
||||
|
||||
try:
|
||||
end_point = global_config.api_urls.get(message.message_info.platform, None)
|
||||
if end_point:
|
||||
try:
|
||||
await global_api.send_message_rest(end_point, message_json)
|
||||
except Exception as e:
|
||||
logger.error(f"REST发送失败: {str(e)}")
|
||||
logger.info(f"[{message.chat_stream.stream_id}] 尝试使用WS发送")
|
||||
await self.send_via_ws(message)
|
||||
else:
|
||||
await self.send_via_ws(message)
|
||||
logger.success(f"发送消息 '{message_preview}' 成功") # 调整日志格式
|
||||
except Exception as e:
|
||||
logger.error(f"发送消息 '{message_preview}' 失败: {str(e)}")
|
||||
|
||||
|
||||
class MessageContainer:
|
||||
"""单个聊天流的发送/思考消息容器"""
|
||||
|
||||
def __init__(self, chat_id: str, max_size: int = 100):
|
||||
self.chat_id = chat_id
|
||||
self.max_size = max_size
|
||||
self.messages: List[Union[MessageThinking, MessageSending]] = [] # 明确类型
|
||||
self.last_send_time = 0
|
||||
self.thinking_wait_timeout = 20 # 思考等待超时时间(秒) - 从旧 sender 合并
|
||||
|
||||
def count_thinking_messages(self) -> int:
|
||||
"""计算当前容器中思考消息的数量"""
|
||||
return sum(1 for msg in self.messages if isinstance(msg, MessageThinking))
|
||||
|
||||
def get_timeout_sending_messages(self) -> List[MessageSending]:
|
||||
"""获取所有超时的MessageSending对象(思考时间超过20秒),按thinking_start_time排序 - 从旧 sender 合并"""
|
||||
current_time = time.time()
|
||||
timeout_messages = []
|
||||
|
||||
for msg in self.messages:
|
||||
# 只检查 MessageSending 类型
|
||||
if isinstance(msg, MessageSending):
|
||||
# 确保 thinking_start_time 有效
|
||||
if msg.thinking_start_time and current_time - msg.thinking_start_time > self.thinking_wait_timeout:
|
||||
timeout_messages.append(msg)
|
||||
|
||||
# 按thinking_start_time排序,时间早的在前面
|
||||
timeout_messages.sort(key=lambda x: x.thinking_start_time)
|
||||
return timeout_messages
|
||||
|
||||
def get_earliest_message(self) -> Optional[Union[MessageThinking, MessageSending]]:
|
||||
"""获取thinking_start_time最早的消息对象"""
|
||||
if not self.messages:
|
||||
return None
|
||||
earliest_time = float("inf")
|
||||
earliest_message = None
|
||||
for msg in self.messages:
|
||||
# 确保消息有 thinking_start_time 属性
|
||||
msg_time = getattr(msg, "thinking_start_time", float("inf"))
|
||||
if msg_time < earliest_time:
|
||||
earliest_time = msg_time
|
||||
earliest_message = msg
|
||||
return earliest_message
|
||||
|
||||
def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]) -> None:
|
||||
"""添加消息到队列"""
|
||||
if isinstance(message, MessageSet):
|
||||
for single_message in message.messages:
|
||||
self.messages.append(single_message)
|
||||
else:
|
||||
self.messages.append(message)
|
||||
|
||||
def remove_message(self, message_to_remove: Union[MessageThinking, MessageSending]) -> bool:
|
||||
"""移除指定的消息对象,如果消息存在则返回True,否则返回False"""
|
||||
try:
|
||||
_initial_len = len(self.messages)
|
||||
# 使用列表推导式或 filter 创建新列表,排除要删除的元素
|
||||
# self.messages = [msg for msg in self.messages if msg is not message_to_remove]
|
||||
# 或者直接 remove (如果确定对象唯一性)
|
||||
if message_to_remove in self.messages:
|
||||
self.messages.remove(message_to_remove)
|
||||
return True
|
||||
# logger.debug(f"Removed message {getattr(message_to_remove, 'message_info', {}).get('message_id', 'UNKNOWN')}. Old len: {initial_len}, New len: {len(self.messages)}")
|
||||
# return len(self.messages) < initial_len
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"移除消息时发生错误: {e}")
|
||||
return False
|
||||
|
||||
def has_messages(self) -> bool:
|
||||
"""检查是否有待发送的消息"""
|
||||
return bool(self.messages)
|
||||
|
||||
def get_all_messages(self) -> List[Union[MessageSending, MessageThinking]]:
|
||||
"""获取所有消息"""
|
||||
return list(self.messages) # 返回副本
|
||||
|
||||
|
||||
class MessageManager:
|
||||
"""管理所有聊天流的消息容器 (不再是单例)"""
|
||||
|
||||
def __init__(self):
|
||||
self.containers: Dict[str, MessageContainer] = {}
|
||||
self.storage = MessageStorage() # 添加 storage 实例
|
||||
self._running = True # 处理器运行状态
|
||||
self._container_lock = asyncio.Lock() # 保护 containers 字典的锁
|
||||
# self.message_sender = MessageSender() # 创建发送器实例 (改为全局实例)
|
||||
|
||||
async def start(self):
|
||||
"""启动后台处理器任务。"""
|
||||
# 检查是否已有任务在运行,避免重复启动
|
||||
if hasattr(self, "_processor_task") and not self._processor_task.done():
|
||||
logger.warning("Processor task already running.")
|
||||
return
|
||||
self._processor_task = asyncio.create_task(self._start_processor_loop())
|
||||
logger.info("MessageManager processor task started.")
|
||||
|
||||
def stop(self):
|
||||
"""停止后台处理器任务。"""
|
||||
self._running = False
|
||||
if hasattr(self, "_processor_task") and not self._processor_task.done():
|
||||
self._processor_task.cancel()
|
||||
logger.info("MessageManager processor task stopping.")
|
||||
else:
|
||||
logger.info("MessageManager processor task not running or already stopped.")
|
||||
|
||||
async def get_container(self, chat_id: str) -> MessageContainer:
|
||||
"""获取或创建聊天流的消息容器 (异步,使用锁)"""
|
||||
async with self._container_lock:
|
||||
if chat_id not in self.containers:
|
||||
self.containers[chat_id] = MessageContainer(chat_id)
|
||||
return self.containers[chat_id]
|
||||
|
||||
async def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]) -> None:
|
||||
"""添加消息到对应容器"""
|
||||
chat_stream = message.chat_stream
|
||||
if not chat_stream:
|
||||
logger.error("消息缺少 chat_stream,无法添加到容器")
|
||||
return # 或者抛出异常
|
||||
container = await self.get_container(chat_stream.stream_id)
|
||||
container.add_message(message)
|
||||
|
||||
def check_if_sending_message_exist(self, chat_id, thinking_id):
|
||||
"""检查指定聊天流的容器中是否存在具有特定 thinking_id 的 MessageSending 消息 或 emoji 消息"""
|
||||
# 这个方法现在是非异步的,因为它只读取数据
|
||||
container = self.containers.get(chat_id) # 直接 get,因为读取不需要锁
|
||||
if container and container.has_messages():
|
||||
for message in container.get_all_messages():
|
||||
if isinstance(message, MessageSending):
|
||||
msg_id = getattr(message.message_info, "message_id", None)
|
||||
# 检查 message_id 是否匹配 thinking_id 或以 "me" 开头 (emoji)
|
||||
if msg_id == thinking_id or (msg_id and msg_id.startswith("me")):
|
||||
# logger.debug(f"检查到存在相同thinking_id或emoji的消息: {msg_id} for {thinking_id}")
|
||||
return True
|
||||
return False
|
||||
|
||||
async def _handle_sending_message(self, container: MessageContainer, message: MessageSending):
|
||||
"""处理单个 MessageSending 消息 (包含 set_reply 逻辑)"""
|
||||
try:
|
||||
_ = message.update_thinking_time() # 更新思考时间
|
||||
thinking_start_time = message.thinking_start_time
|
||||
now_time = time.time()
|
||||
thinking_messages_count, thinking_messages_length = count_messages_between(
|
||||
start_time=thinking_start_time, end_time=now_time, stream_id=message.chat_stream.stream_id
|
||||
)
|
||||
|
||||
# --- 条件应用 set_reply 逻辑 ---
|
||||
if (
|
||||
message.apply_set_reply_logic # 检查标记
|
||||
and message.is_head
|
||||
and (thinking_messages_count > 4 or thinking_messages_length > 250)
|
||||
and not message.is_private_message()
|
||||
):
|
||||
logger.debug(
|
||||
f"[{message.chat_stream.stream_id}] 应用 set_reply 逻辑: {message.processed_plain_text[:20]}..."
|
||||
)
|
||||
message.set_reply()
|
||||
# --- 结束条件 set_reply ---
|
||||
|
||||
await message.process() # 预处理消息内容
|
||||
|
||||
# 使用全局 message_sender 实例
|
||||
await message_sender.send_message(message)
|
||||
await self.storage.store_message(message, message.chat_stream)
|
||||
|
||||
# 移除消息要在发送 *之后*
|
||||
container.remove_message(message)
|
||||
# logger.debug(f"[{message.chat_stream.stream_id}] Sent and removed message: {message.message_info.message_id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[{message.chat_stream.stream_id}] 处理发送消息 {getattr(message.message_info, 'message_id', 'N/A')} 时出错: {e}"
|
||||
)
|
||||
logger.exception("详细错误信息:")
|
||||
# 考虑是否移除出错的消息,防止无限循环
|
||||
removed = container.remove_message(message)
|
||||
if removed:
|
||||
logger.warning(f"[{message.chat_stream.stream_id}] 已移除处理出错的消息。")
|
||||
|
||||
async def _process_chat_messages(self, chat_id: str):
|
||||
"""处理单个聊天流消息 (合并后的逻辑)"""
|
||||
container = await self.get_container(chat_id) # 获取容器是异步的了
|
||||
|
||||
if container.has_messages():
|
||||
message_earliest = container.get_earliest_message()
|
||||
|
||||
if not message_earliest: # 如果最早消息为空,则退出
|
||||
return
|
||||
|
||||
if isinstance(message_earliest, MessageThinking):
|
||||
# --- 处理思考消息 (来自旧 sender) ---
|
||||
message_earliest.update_thinking_time()
|
||||
thinking_time = message_earliest.thinking_time
|
||||
# 减少控制台刷新频率或只在时间显著变化时打印
|
||||
if int(thinking_time) % 5 == 0: # 每5秒打印一次
|
||||
print(
|
||||
f"消息 {message_earliest.message_info.message_id} 正在思考中,已思考 {int(thinking_time)} 秒\r",
|
||||
end="",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
# 检查是否超时
|
||||
if thinking_time > global_config.thinking_timeout:
|
||||
logger.warning(
|
||||
f"[{chat_id}] 消息思考超时 ({thinking_time:.1f}秒),移除消息 {message_earliest.message_info.message_id}"
|
||||
)
|
||||
container.remove_message(message_earliest)
|
||||
print() # 超时后换行,避免覆盖下一条日志
|
||||
|
||||
elif isinstance(message_earliest, MessageSending):
|
||||
# --- 处理发送消息 ---
|
||||
await self._handle_sending_message(container, message_earliest)
|
||||
|
||||
# --- 处理超时发送消息 (来自旧 sender) ---
|
||||
# 在处理完最早的消息后,检查是否有超时的发送消息
|
||||
timeout_sending_messages = container.get_timeout_sending_messages()
|
||||
if timeout_sending_messages:
|
||||
logger.debug(f"[{chat_id}] 发现 {len(timeout_sending_messages)} 条超时的发送消息")
|
||||
for msg in timeout_sending_messages:
|
||||
# 确保不是刚刚处理过的最早消息 (虽然理论上应该已被移除,但以防万一)
|
||||
if msg is message_earliest:
|
||||
continue
|
||||
logger.info(f"[{chat_id}] 处理超时发送消息: {msg.message_info.message_id}")
|
||||
await self._handle_sending_message(container, msg) # 复用处理逻辑
|
||||
|
||||
# 清理空容器 (可选)
|
||||
# async with self._container_lock:
|
||||
# if not container.has_messages() and chat_id in self.containers:
|
||||
# logger.debug(f"[{chat_id}] 容器已空,准备移除。")
|
||||
# del self.containers[chat_id]
|
||||
|
||||
async def _start_processor_loop(self):
|
||||
"""消息处理器主循环"""
|
||||
while self._running:
|
||||
tasks = []
|
||||
# 使用异步锁保护迭代器创建过程
|
||||
async with self._container_lock:
|
||||
# 创建 keys 的快照以安全迭代
|
||||
chat_ids = list(self.containers.keys())
|
||||
|
||||
for chat_id in chat_ids:
|
||||
# 为每个 chat_id 创建一个处理任务
|
||||
tasks.append(asyncio.create_task(self._process_chat_messages(chat_id)))
|
||||
|
||||
if tasks:
|
||||
try:
|
||||
# 等待当前批次的所有任务完成
|
||||
await asyncio.gather(*tasks)
|
||||
except Exception as e:
|
||||
logger.error(f"消息处理循环 gather 出错: {e}")
|
||||
|
||||
# 等待一小段时间,避免CPU空转
|
||||
try:
|
||||
await asyncio.sleep(0.1) # 稍微降低轮询频率
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Processor loop sleep cancelled.")
|
||||
break # 退出循环
|
||||
logger.info("MessageManager processor loop finished.")
|
||||
|
||||
|
||||
# --- 创建全局实例 ---
|
||||
message_manager = MessageManager()
|
||||
message_sender = MessageSender()
|
||||
# --- 结束全局实例 ---
|
||||
@@ -1,291 +0,0 @@
|
||||
import asyncio
|
||||
import time
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
from src.common.logger import get_module_logger
|
||||
from ...common.database import db
|
||||
from ..message.api import global_api
|
||||
from .message import MessageSending, MessageThinking, MessageSet
|
||||
|
||||
from ..storage.storage import MessageStorage
|
||||
from ...config.config import global_config
|
||||
from .utils import truncate_message, calculate_typing_time, count_messages_between
|
||||
|
||||
from src.common.logger import LogConfig, SENDER_STYLE_CONFIG
|
||||
|
||||
# 定义日志配置
|
||||
sender_config = LogConfig(
|
||||
# 使用消息发送专用样式
|
||||
console_format=SENDER_STYLE_CONFIG["console_format"],
|
||||
file_format=SENDER_STYLE_CONFIG["file_format"],
|
||||
)
|
||||
|
||||
logger = get_module_logger("msg_sender", config=sender_config)
|
||||
|
||||
|
||||
class MessageSender:
|
||||
"""发送器"""
|
||||
|
||||
def __init__(self):
|
||||
self.message_interval = (0.5, 1) # 消息间隔时间范围(秒)
|
||||
self.last_send_time = 0
|
||||
self._current_bot = None
|
||||
|
||||
def set_bot(self, bot):
|
||||
"""设置当前bot实例"""
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def get_recalled_messages(stream_id: str) -> list:
|
||||
"""获取所有撤回的消息"""
|
||||
recalled_messages = []
|
||||
|
||||
recalled_messages = list(db.recalled_messages.find({"stream_id": stream_id}, {"message_id": 1}))
|
||||
# 按thinking_start_time排序,时间早的在前面
|
||||
return recalled_messages
|
||||
|
||||
@staticmethod
|
||||
async def send_via_ws(message: MessageSending) -> None:
|
||||
try:
|
||||
await global_api.send_message(message)
|
||||
except Exception as e:
|
||||
raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件") from e
|
||||
|
||||
async def send_message(
|
||||
self,
|
||||
message: MessageSending,
|
||||
) -> None:
|
||||
"""发送消息"""
|
||||
|
||||
if isinstance(message, MessageSending):
|
||||
recalled_messages = self.get_recalled_messages(message.chat_stream.stream_id)
|
||||
is_recalled = False
|
||||
for recalled_message in recalled_messages:
|
||||
if message.reply_to_message_id == recalled_message["message_id"]:
|
||||
is_recalled = True
|
||||
logger.warning(f"消息“{message.processed_plain_text}”已被撤回,不发送")
|
||||
break
|
||||
if not is_recalled:
|
||||
# print(message.processed_plain_text + str(message.is_emoji))
|
||||
typing_time = calculate_typing_time(
|
||||
input_string=message.processed_plain_text,
|
||||
thinking_start_time=message.thinking_start_time,
|
||||
is_emoji=message.is_emoji,
|
||||
)
|
||||
logger.trace(f"{message.processed_plain_text},{typing_time},计算输入时间结束")
|
||||
await asyncio.sleep(typing_time)
|
||||
logger.trace(f"{message.processed_plain_text},{typing_time},等待输入时间结束")
|
||||
|
||||
message_json = message.to_dict()
|
||||
|
||||
message_preview = truncate_message(message.processed_plain_text)
|
||||
try:
|
||||
end_point = global_config.api_urls.get(message.message_info.platform, None)
|
||||
if end_point:
|
||||
# logger.info(f"发送消息到{end_point}")
|
||||
# logger.info(message_json)
|
||||
try:
|
||||
await global_api.send_message_rest(end_point, message_json)
|
||||
except Exception as e:
|
||||
logger.error(f"REST方式发送失败,出现错误: {str(e)}")
|
||||
logger.info("尝试使用ws发送")
|
||||
await self.send_via_ws(message)
|
||||
else:
|
||||
await self.send_via_ws(message)
|
||||
logger.success(f"发送消息“{message_preview}”成功")
|
||||
except Exception as e:
|
||||
logger.error(f"发送消息“{message_preview}”失败: {str(e)}")
|
||||
|
||||
|
||||
class MessageContainer:
|
||||
"""单个聊天流的发送/思考消息容器"""
|
||||
|
||||
def __init__(self, chat_id: str, max_size: int = 100):
|
||||
self.chat_id = chat_id
|
||||
self.max_size = max_size
|
||||
self.messages = []
|
||||
self.last_send_time = 0
|
||||
self.thinking_wait_timeout = 20 # 思考等待超时时间(秒)
|
||||
|
||||
def get_timeout_messages(self) -> List[MessageSending]:
|
||||
"""获取所有超时的Message_Sending对象(思考时间超过20秒),按thinking_start_time排序"""
|
||||
current_time = time.time()
|
||||
timeout_messages = []
|
||||
|
||||
for msg in self.messages:
|
||||
if isinstance(msg, MessageSending):
|
||||
if current_time - msg.thinking_start_time > self.thinking_wait_timeout:
|
||||
timeout_messages.append(msg)
|
||||
|
||||
# 按thinking_start_time排序,时间早的在前面
|
||||
timeout_messages.sort(key=lambda x: x.thinking_start_time)
|
||||
|
||||
return timeout_messages
|
||||
|
||||
def get_earliest_message(self) -> Optional[Union[MessageThinking, MessageSending]]:
|
||||
"""获取thinking_start_time最早的消息对象"""
|
||||
if not self.messages:
|
||||
return None
|
||||
earliest_time = float("inf")
|
||||
earliest_message = None
|
||||
for msg in self.messages:
|
||||
msg_time = msg.thinking_start_time
|
||||
if msg_time < earliest_time:
|
||||
earliest_time = msg_time
|
||||
earliest_message = msg
|
||||
return earliest_message
|
||||
|
||||
def add_message(self, message: Union[MessageThinking, MessageSending]) -> None:
|
||||
"""添加消息到队列"""
|
||||
if isinstance(message, MessageSet):
|
||||
for single_message in message.messages:
|
||||
self.messages.append(single_message)
|
||||
else:
|
||||
self.messages.append(message)
|
||||
|
||||
def remove_message(self, message: Union[MessageThinking, MessageSending]) -> bool:
|
||||
"""移除消息,如果消息存在则返回True,否则返回False"""
|
||||
try:
|
||||
if message in self.messages:
|
||||
self.messages.remove(message)
|
||||
return True
|
||||
return False
|
||||
except Exception:
|
||||
logger.exception("移除消息时发生错误")
|
||||
return False
|
||||
|
||||
def has_messages(self) -> bool:
|
||||
"""检查是否有待发送的消息"""
|
||||
return bool(self.messages)
|
||||
|
||||
def get_all_messages(self) -> List[Union[MessageSending, MessageThinking]]:
|
||||
"""获取所有消息"""
|
||||
return list(self.messages)
|
||||
|
||||
|
||||
class MessageManager:
|
||||
"""管理所有聊天流的消息容器"""
|
||||
|
||||
def __init__(self):
|
||||
self.containers: Dict[str, MessageContainer] = {} # chat_id -> MessageContainer
|
||||
self.storage = MessageStorage()
|
||||
self._running = True
|
||||
|
||||
def get_container(self, chat_id: str) -> MessageContainer:
|
||||
"""获取或创建聊天流的消息容器"""
|
||||
if chat_id not in self.containers:
|
||||
self.containers[chat_id] = MessageContainer(chat_id)
|
||||
return self.containers[chat_id]
|
||||
|
||||
def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]) -> None:
|
||||
chat_stream = message.chat_stream
|
||||
if not chat_stream:
|
||||
raise ValueError("无法找到对应的聊天流")
|
||||
container = self.get_container(chat_stream.stream_id)
|
||||
container.add_message(message)
|
||||
|
||||
async def process_chat_messages(self, chat_id: str):
|
||||
"""处理聊天流消息"""
|
||||
container = self.get_container(chat_id)
|
||||
if container.has_messages():
|
||||
# print(f"处理有message的容器chat_id: {chat_id}")
|
||||
message_earliest = container.get_earliest_message()
|
||||
|
||||
if isinstance(message_earliest, MessageThinking):
|
||||
"""取得了思考消息"""
|
||||
message_earliest.update_thinking_time()
|
||||
thinking_time = message_earliest.thinking_time
|
||||
# print(thinking_time)
|
||||
print(
|
||||
f"消息正在思考中,已思考{int(thinking_time)}秒\r",
|
||||
end="",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
# 检查是否超时
|
||||
if thinking_time > global_config.thinking_timeout:
|
||||
logger.warning(f"消息思考超时({thinking_time}秒),移除该消息")
|
||||
container.remove_message(message_earliest)
|
||||
|
||||
else:
|
||||
"""取得了发送消息"""
|
||||
thinking_time = message_earliest.update_thinking_time()
|
||||
thinking_start_time = message_earliest.thinking_start_time
|
||||
now_time = time.time()
|
||||
thinking_messages_count, thinking_messages_length = count_messages_between(
|
||||
start_time=thinking_start_time, end_time=now_time, stream_id=message_earliest.chat_stream.stream_id
|
||||
)
|
||||
# print(thinking_time)
|
||||
# print(thinking_messages_count)
|
||||
# print(thinking_messages_length)
|
||||
|
||||
if (
|
||||
message_earliest.is_head
|
||||
and (thinking_messages_count > 4 or thinking_messages_length > 250)
|
||||
and not message_earliest.is_private_message() # 避免在私聊时插入reply
|
||||
):
|
||||
logger.debug(f"设置回复消息{message_earliest.processed_plain_text}")
|
||||
message_earliest.set_reply()
|
||||
|
||||
await message_earliest.process()
|
||||
|
||||
# print(f"message_earliest.thinking_start_tim22222e:{message_earliest.thinking_start_time}")
|
||||
|
||||
await message_sender.send_message(message_earliest)
|
||||
|
||||
await self.storage.store_message(message_earliest, message_earliest.chat_stream)
|
||||
|
||||
container.remove_message(message_earliest)
|
||||
|
||||
message_timeout = container.get_timeout_messages()
|
||||
if message_timeout:
|
||||
logger.debug(f"发现{len(message_timeout)}条超时消息")
|
||||
for msg in message_timeout:
|
||||
if msg == message_earliest:
|
||||
continue
|
||||
|
||||
try:
|
||||
thinking_time = msg.update_thinking_time()
|
||||
thinking_start_time = msg.thinking_start_time
|
||||
now_time = time.time()
|
||||
thinking_messages_count, thinking_messages_length = count_messages_between(
|
||||
start_time=thinking_start_time, end_time=now_time, stream_id=msg.chat_stream.stream_id
|
||||
)
|
||||
# print(thinking_time)
|
||||
# print(thinking_messages_count)
|
||||
# print(thinking_messages_length)
|
||||
if (
|
||||
msg.is_head
|
||||
and (thinking_messages_count > 4 or thinking_messages_length > 250)
|
||||
and not msg.is_private_message() # 避免在私聊时插入reply
|
||||
):
|
||||
logger.debug(f"设置回复消息{msg.processed_plain_text}")
|
||||
msg.set_reply()
|
||||
|
||||
await msg.process()
|
||||
|
||||
await message_sender.send_message(msg)
|
||||
|
||||
await self.storage.store_message(msg, msg.chat_stream)
|
||||
|
||||
if not container.remove_message(msg):
|
||||
logger.warning("尝试删除不存在的消息")
|
||||
except Exception:
|
||||
logger.exception("处理超时消息时发生错误")
|
||||
continue
|
||||
|
||||
async def start_processor(self):
|
||||
"""启动消息处理器"""
|
||||
while self._running:
|
||||
await asyncio.sleep(1)
|
||||
tasks = []
|
||||
for chat_id in self.containers.keys():
|
||||
tasks.append(self.process_chat_messages(chat_id))
|
||||
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
|
||||
# 创建全局消息管理器实例
|
||||
message_manager = MessageManager()
|
||||
# 创建全局发送器实例
|
||||
message_sender = MessageSender()
|
||||
65
src/plugins/chat/only_message_process.py
Normal file
65
src/plugins/chat/only_message_process.py
Normal file
@@ -0,0 +1,65 @@
|
||||
from src.common.logger import get_module_logger
|
||||
from src.plugins.chat.message import MessageRecv
|
||||
from src.plugins.storage.storage import MessageStorage
|
||||
from src.config.config import global_config
|
||||
from datetime import datetime
|
||||
|
||||
logger = get_module_logger("pfc_message_processor")
|
||||
|
||||
|
||||
class MessageProcessor:
|
||||
"""消息处理器,负责处理接收到的消息并存储"""
|
||||
|
||||
def __init__(self):
|
||||
self.storage = MessageStorage()
|
||||
|
||||
@staticmethod
|
||||
def _check_ban_words(text: str, chat, userinfo) -> bool:
|
||||
"""检查消息中是否包含过滤词"""
|
||||
for word in global_config.ban_words:
|
||||
if word in text:
|
||||
logger.info(
|
||||
f"[{chat.group_info.group_name if chat.group_info else '私聊'}]{userinfo.user_nickname}:{text}"
|
||||
)
|
||||
logger.info(f"[过滤词识别]消息中含有{word},filtered")
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _check_ban_regex(text: str, chat, userinfo) -> bool:
|
||||
"""检查消息是否匹配过滤正则表达式"""
|
||||
for pattern in global_config.ban_msgs_regex:
|
||||
if pattern.search(text):
|
||||
logger.info(
|
||||
f"[{chat.group_info.group_name if chat.group_info else '私聊'}]{userinfo.user_nickname}:{text}"
|
||||
)
|
||||
logger.info(f"[正则表达式过滤]消息匹配到{pattern},filtered")
|
||||
return True
|
||||
return False
|
||||
|
||||
async def process_message(self, message: MessageRecv) -> None:
|
||||
"""处理消息并存储
|
||||
|
||||
Args:
|
||||
message: 消息对象
|
||||
"""
|
||||
userinfo = message.message_info.user_info
|
||||
chat = message.chat_stream
|
||||
|
||||
# 处理消息
|
||||
await message.process()
|
||||
|
||||
# 过滤词/正则表达式过滤
|
||||
if self._check_ban_words(message.processed_plain_text, chat, userinfo) or self._check_ban_regex(
|
||||
message.raw_message, chat, userinfo
|
||||
):
|
||||
return
|
||||
|
||||
# 存储消息
|
||||
await self.storage.store_message(message, chat)
|
||||
|
||||
# 打印消息信息
|
||||
mes_name = chat.group_info.group_name if chat.group_info else "私聊"
|
||||
# 将时间戳转换为datetime对象
|
||||
current_time = datetime.fromtimestamp(message.message_info.time).strftime("%H:%M:%S")
|
||||
logger.info(f"[{current_time}][{mes_name}]{message.message_info.user_info.user_nickname}: {message.processed_plain_text}")
|
||||
@@ -218,7 +218,7 @@ class ImageManager:
|
||||
"timestamp": timestamp,
|
||||
}
|
||||
db.images.update_one({"hash": image_hash}, {"$set": image_doc}, upsert=True)
|
||||
logger.success(f"保存图片: {file_path}")
|
||||
logger.trace(f"保存图片: {file_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"保存图片文件失败: {str(e)}")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user