diff --git a/src/chat/focus_chat/hfc_utils.py b/src/chat/focus_chat/hfc_utils.py index a7a4fe122..f7b9fdc93 100644 --- a/src/chat/focus_chat/hfc_utils.py +++ b/src/chat/focus_chat/hfc_utils.py @@ -5,8 +5,6 @@ from src.chat.message_receive.message import UserInfo from src.common.logger import get_logger from typing import Dict, Any from src.config.config import global_config -from src.chat.message_receive.message import MessageThinking -from src.chat.message_receive.normal_message_sender import message_manager from src.common.message_repository import count_messages @@ -86,40 +84,6 @@ class CycleDetail: self.loop_action_info = loop_info["loop_action_info"] -async def create_thinking_message_from_dict(message_data: dict, chat_stream: ChatStream, thinking_id: str) -> str: - """创建思考消息""" - bot_user_info = UserInfo( - user_id=global_config.bot.qq_account, - user_nickname=global_config.bot.nickname, - platform=message_data.get("chat_info_platform"), - ) - - thinking_message = MessageThinking( - message_id=thinking_id, - chat_stream=chat_stream, - bot_user_info=bot_user_info, - reply=None, - thinking_start_time=time.time(), - timestamp=time.time(), - ) - - await message_manager.add_message(thinking_message) - return thinking_id - -async def cleanup_thinking_message_by_id(chat_id: str, thinking_id: str, log_prefix: str): - """根据ID清理思考消息""" - try: - container = await message_manager.get_container(chat_id) - if container: - for msg in container.messages[:]: - if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: - container.messages.remove(msg) - logger.info(f"{log_prefix}已清理思考消息 {thinking_id}") - break - except Exception as e: - logger.error(f"{log_prefix} 清理思考消息 {thinking_id} 时出错: {e}") - - def get_recent_message_stats(minutes: int = 30, chat_id: str = None) -> dict: """ diff --git a/src/chat/message_receive/__init__.py b/src/chat/message_receive/__init__.py index d01bea726..44b9eee36 100644 --- a/src/chat/message_receive/__init__.py +++ b/src/chat/message_receive/__init__.py @@ -1,12 +1,10 @@ from src.chat.emoji_system.emoji_manager import get_emoji_manager from src.chat.message_receive.chat_stream import get_chat_manager -from src.chat.message_receive.normal_message_sender import message_manager from src.chat.message_receive.storage import MessageStorage __all__ = [ "get_emoji_manager", "get_chat_manager", - "message_manager", "MessageStorage", ] diff --git a/src/chat/message_receive/normal_message_sender.py b/src/chat/message_receive/normal_message_sender.py deleted file mode 100644 index c8bf72107..000000000 --- a/src/chat/message_receive/normal_message_sender.py +++ /dev/null @@ -1,310 +0,0 @@ -# src/plugins/chat/message_sender.py -import asyncio -import time -from asyncio import Task -from typing import Union -from src.common.message.api import get_global_api - -# from ...common.database import db # 数据库依赖似乎不需要了,注释掉 -from .message import MessageSending, MessageThinking, MessageSet - -from src.chat.message_receive.storage import MessageStorage -from ..utils.utils import truncate_message, calculate_typing_time, count_messages_between - -from src.common.logger import get_logger -from rich.traceback import install -import traceback - -install(extra_lines=3) - - -logger = get_logger("sender") - - -async def send_via_ws(message: MessageSending) -> None: - """通过 WebSocket 发送消息""" - try: - await get_global_api().send_message(message) - except Exception as e: - logger.error(f"WS发送失败: {e}") - raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件") from e - - -async def send_message( - message: MessageSending, -) -> None: - """发送消息(核心发送逻辑)""" - - # --- 添加计算打字和延迟的逻辑 (从 heartflow_message_sender 移动并调整) --- - typing_time = calculate_typing_time( - input_string=message.processed_plain_text, - thinking_start_time=message.thinking_start_time, - is_emoji=message.is_emoji, - ) - # logger.debug(f"{message.processed_plain_text},{typing_time},计算输入时间结束") # 减少日志 - await asyncio.sleep(typing_time) - # logger.debug(f"{message.processed_plain_text},{typing_time},等待输入时间结束") # 减少日志 - # --- 结束打字延迟 --- - - message_preview = truncate_message(message.processed_plain_text) - - try: - await send_via_ws(message) - logger.info(f"发送消息 '{message_preview}' 成功") # 调整日志格式 - except Exception as e: - logger.error(f"发送消息 '{message_preview}' 失败: {str(e)}") - - -class MessageSender: - """发送器 (不再是单例)""" - - def __init__(self): - self.message_interval = (0.5, 1) # 消息间隔时间范围(秒) - self.last_send_time = 0 - self._current_bot = None - - def set_bot(self, bot): - """设置当前bot实例""" - pass - - -class MessageContainer: - """单个聊天流的发送/思考消息容器""" - - def __init__(self, chat_id: str, max_size: int = 100): - self.chat_id = chat_id - self.max_size = max_size - self.messages: list[MessageThinking | MessageSending] = [] # 明确类型 - self.last_send_time = 0 - self.thinking_wait_timeout = 20 # 思考等待超时时间(秒) - 从旧 sender 合并 - - def count_thinking_messages(self) -> int: - """计算当前容器中思考消息的数量""" - return sum(1 for msg in self.messages if isinstance(msg, MessageThinking)) - - def get_timeout_sending_messages(self) -> list[MessageSending]: - """获取所有超时的MessageSending对象(思考时间超过20秒),按thinking_start_time排序 - 从旧 sender 合并""" - current_time = time.time() - timeout_messages = [] - - for msg in self.messages: - # 只检查 MessageSending 类型 - if isinstance(msg, MessageSending): - # 确保 thinking_start_time 有效 - if msg.thinking_start_time and current_time - msg.thinking_start_time > self.thinking_wait_timeout: - timeout_messages.append(msg) - - # 按thinking_start_time排序,时间早的在前面 - timeout_messages.sort(key=lambda x: x.thinking_start_time) - return timeout_messages - - def get_earliest_message(self): - """获取thinking_start_time最早的消息对象""" - if not self.messages: - return None - earliest_time = float("inf") - earliest_message = None - for msg in self.messages: - # 确保消息有 thinking_start_time 属性 - msg_time = getattr(msg, "thinking_start_time", float("inf")) - if msg_time < earliest_time: - earliest_time = msg_time - earliest_message = msg - return earliest_message - - def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]): - """添加消息到队列""" - if isinstance(message, MessageSet): - for single_message in message.messages: - self.messages.append(single_message) - else: - self.messages.append(message) - - def remove_message(self, message_to_remove: Union[MessageThinking, MessageSending]): - """移除指定的消息对象,如果消息存在则返回True,否则返回False""" - try: - _initial_len = len(self.messages) - # 使用列表推导式或 message_filter 创建新列表,排除要删除的元素 - # self.messages = [msg for msg in self.messages if msg is not message_to_remove] - # 或者直接 remove (如果确定对象唯一性) - if message_to_remove in self.messages: - self.messages.remove(message_to_remove) - return True - # logger.debug(f"Removed message {getattr(message_to_remove, 'message_info', {}).get('message_id', 'UNKNOWN')}. Old len: {initial_len}, New len: {len(self.messages)}") - # return len(self.messages) < initial_len - return False - - except Exception as e: - logger.exception(f"移除消息时发生错误: {e}") - return False - - def has_messages(self) -> bool: - """检查是否有待发送的消息""" - return bool(self.messages) - - def get_all_messages(self) -> list[MessageThinking | MessageSending]: - """获取所有消息""" - return list(self.messages) # 返回副本 - - -class MessageManager: - """管理所有聊天流的消息容器 (不再是单例)""" - - def __init__(self): - self._processor_task: Task | None = None - self.containers: dict[str, MessageContainer] = {} - self.storage = MessageStorage() # 添加 storage 实例 - self._running = True # 处理器运行状态 - self._container_lock = asyncio.Lock() # 保护 containers 字典的锁 - # self.message_sender = MessageSender() # 创建发送器实例 (改为全局实例) - - async def start(self): - """启动后台处理器任务。""" - # 检查是否已有任务在运行,避免重复启动 - if self._processor_task is not None and not self._processor_task.done(): - logger.warning("Processor task already running.") - return - self._processor_task = asyncio.create_task(self._start_processor_loop()) - logger.debug("MessageManager processor task started.") - - def stop(self): - """停止后台处理器任务。""" - self._running = False - if self._processor_task is not None and not self._processor_task.done(): - self._processor_task.cancel() - logger.debug("MessageManager processor task stopping.") - else: - logger.debug("MessageManager processor task not running or already stopped.") - - async def get_container(self, chat_id: str) -> MessageContainer: - """获取或创建聊天流的消息容器 (异步,使用锁)""" - async with self._container_lock: - if chat_id not in self.containers: - self.containers[chat_id] = MessageContainer(chat_id) - return self.containers[chat_id] - - async def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]) -> None: - """添加消息到对应容器""" - chat_stream = message.chat_stream - if not chat_stream: - logger.error("消息缺少 chat_stream,无法添加到容器") - return # 或者抛出异常 - container = await self.get_container(chat_stream.stream_id) - container.add_message(message) - - async def _handle_sending_message(self, container: MessageContainer, message: MessageSending): - """处理单个 MessageSending 消息 (包含 set_reply 逻辑)""" - try: - _ = message.update_thinking_time() # 更新思考时间 - thinking_start_time = message.thinking_start_time - now_time = time.time() - # logger.debug(f"thinking_start_time:{thinking_start_time},now_time:{now_time}") - thinking_messages_count, thinking_messages_length = count_messages_between( - start_time=thinking_start_time, end_time=now_time, stream_id=message.chat_stream.stream_id - ) - - if ( - message.is_head - and (thinking_messages_count > 3 or thinking_messages_length > 200) - and not message.is_private_message() - ): - logger.debug( - f"[{message.chat_stream.stream_id}] 应用 set_reply 逻辑: {message.processed_plain_text[:20]}..." - ) - message.build_reply() - # --- 结束条件 set_reply --- - - await message.process() # 预处理消息内容 - - # logger.debug(f"{message}") - - # 使用全局 message_sender 实例 - await send_message(message) - await self.storage.store_message(message, message.chat_stream) - - # 移除消息要在发送 *之后* - container.remove_message(message) - # logger.debug(f"[{message.chat_stream.stream_id}] Sent and removed message: {message.message_info.message_id}") - - except Exception as e: - logger.error( - f"[{message.chat_stream.stream_id}] 处理发送消息 {getattr(message.message_info, 'message_id', 'N/A')} 时出错: {e}" - ) - logger.exception("详细错误信息:") - # 考虑是否移除出错的消息,防止无限循环 - removed = container.remove_message(message) - if removed: - logger.warning(f"[{message.chat_stream.stream_id}] 已移除处理出错的消息。") - - async def _process_chat_messages(self, chat_id: str): - """处理单个聊天流消息 (合并后的逻辑)""" - container = await self.get_container(chat_id) # 获取容器是异步的了 - - if container.has_messages(): - message_earliest = container.get_earliest_message() - - if not message_earliest: # 如果最早消息为空,则退出 - return - - if isinstance(message_earliest, MessageThinking): - # --- 处理思考消息 (来自旧 sender) --- - message_earliest.update_thinking_time() - thinking_time = message_earliest.thinking_time - # 减少控制台刷新频率或只在时间显著变化时打印 - if int(thinking_time) % 5 == 0: # 每5秒打印一次 - print( - f"消息 {message_earliest.message_info.message_id} 正在思考中,已思考 {int(thinking_time)} 秒\r", - end="", - flush=True, - ) - - elif isinstance(message_earliest, MessageSending): - # --- 处理发送消息 --- - await self._handle_sending_message(container, message_earliest) - - # --- 处理超时发送消息 (来自旧 sender) --- - # 在处理完最早的消息后,检查是否有超时的发送消息 - timeout_sending_messages = container.get_timeout_sending_messages() - if timeout_sending_messages: - logger.debug(f"[{chat_id}] 发现 {len(timeout_sending_messages)} 条超时的发送消息") - for msg in timeout_sending_messages: - # 确保不是刚刚处理过的最早消息 (虽然理论上应该已被移除,但以防万一) - if msg is message_earliest: - continue - logger.info(f"[{chat_id}] 处理超时发送消息: {msg.message_info.message_id}") - await self._handle_sending_message(container, msg) # 复用处理逻辑 - - async def _start_processor_loop(self): - """消息处理器主循环""" - while self._running: - tasks = [] - # 使用异步锁保护迭代器创建过程 - async with self._container_lock: - # 创建 keys 的快照以安全迭代 - chat_ids = list(self.containers.keys()) - - for chat_id in chat_ids: - # 为每个 chat_id 创建一个处理任务 - tasks.append(asyncio.create_task(self._process_chat_messages(chat_id))) - - if tasks: - try: - # 等待当前批次的所有任务完成 - await asyncio.gather(*tasks) - except Exception as e: - logger.error(f"消息处理循环 gather 出错: {e}") - print(traceback.format_exc()) - - # 等待一小段时间,避免CPU空转 - try: - await asyncio.sleep(0.1) # 稍微降低轮询频率 - except asyncio.CancelledError: - logger.info("Processor loop sleep cancelled.") - break # 退出循环 - logger.info("MessageManager processor loop finished.") - - -# --- 创建全局实例 --- -message_manager = MessageManager() -message_sender = MessageSender() -# --- 结束全局实例 --- diff --git a/src/main.py b/src/main.py index d7e02dc8f..a457f42e4 100644 --- a/src/main.py +++ b/src/main.py @@ -9,7 +9,6 @@ from src.chat.utils.statistic import OnlineTimeRecordTask, StatisticOutputTask from src.chat.emoji_system.emoji_manager import get_emoji_manager from src.chat.willing.willing_manager import get_willing_manager from src.chat.message_receive.chat_stream import get_chat_manager -from src.chat.message_receive.normal_message_sender import message_manager from src.chat.message_receive.storage import MessageStorage from src.config.config import global_config from src.chat.message_receive.bot import chat_bot @@ -126,9 +125,6 @@ class MainSystem: logger.info("个体特征初始化成功") try: - # 启动全局消息管理器 (负责消息发送/排队) - await message_manager.start() - logger.info("全局消息管理器启动成功") init_time = int(1000 * (time.time() - init_start_time)) logger.info(f"初始化完成,神经元放电{init_time}次")