fix:拯救大兵ruff 2

This commit is contained in:
SengokuCola
2025-04-23 00:43:33 +08:00
parent f2c50d2cd4
commit e2d882ec82
11 changed files with 235 additions and 193 deletions

View File

@@ -4,6 +4,7 @@ import time
from typing import Dict, List, Optional, Union
from src.common.logger import get_module_logger
# from ...common.database import db # 数据库依赖似乎不需要了,注释掉
from ..message.api import global_api
from .message import MessageSending, MessageThinking, MessageSet
@@ -75,7 +76,7 @@ class MessageSender:
await self.send_via_ws(message)
else:
await self.send_via_ws(message)
logger.success(f"发送消息 '{message_preview}' 成功") # 调整日志格式
logger.success(f"发送消息 '{message_preview}' 成功") # 调整日志格式
except Exception as e:
logger.error(f"发送消息 '{message_preview}' 失败: {str(e)}")
@@ -86,7 +87,7 @@ class MessageContainer:
def __init__(self, chat_id: str, max_size: int = 100):
self.chat_id = chat_id
self.max_size = max_size
self.messages: List[Union[MessageThinking, MessageSending]] = [] # 明确类型
self.messages: List[Union[MessageThinking, MessageSending]] = [] # 明确类型
self.last_send_time = 0
self.thinking_wait_timeout = 20 # 思考等待超时时间(秒) - 从旧 sender 合并
@@ -118,7 +119,7 @@ class MessageContainer:
earliest_message = None
for msg in self.messages:
# 确保消息有 thinking_start_time 属性
msg_time = getattr(msg, 'thinking_start_time', float('inf'))
msg_time = getattr(msg, "thinking_start_time", float("inf"))
if msg_time < earliest_time:
earliest_time = msg_time
earliest_message = msg
@@ -156,7 +157,7 @@ class MessageContainer:
def get_all_messages(self) -> List[Union[MessageSending, MessageThinking]]:
"""获取所有消息"""
return list(self.messages) # 返回副本
return list(self.messages) # 返回副本
class MessageManager:
@@ -164,29 +165,28 @@ class MessageManager:
def __init__(self):
self.containers: Dict[str, MessageContainer] = {}
self.storage = MessageStorage() # 添加 storage 实例
self._running = True # 处理器运行状态
self._container_lock = asyncio.Lock() # 保护 containers 字典的锁
self.storage = MessageStorage() # 添加 storage 实例
self._running = True # 处理器运行状态
self._container_lock = asyncio.Lock() # 保护 containers 字典的锁
# self.message_sender = MessageSender() # 创建发送器实例 (改为全局实例)
async def start(self):
"""启动后台处理器任务。"""
# 检查是否已有任务在运行,避免重复启动
if hasattr(self, '_processor_task') and not self._processor_task.done():
logger.warning("Processor task already running.")
return
if hasattr(self, "_processor_task") and not self._processor_task.done():
logger.warning("Processor task already running.")
return
self._processor_task = asyncio.create_task(self._start_processor_loop())
logger.info("MessageManager processor task started.")
def stop(self):
"""停止后台处理器任务。"""
self._running = False
if hasattr(self, '_processor_task') and not self._processor_task.done():
self._processor_task.cancel()
logger.info("MessageManager processor task stopping.")
else:
logger.info("MessageManager processor task not running or already stopped.")
"""停止后台处理器任务。"""
self._running = False
if hasattr(self, "_processor_task") and not self._processor_task.done():
self._processor_task.cancel()
logger.info("MessageManager processor task stopping.")
else:
logger.info("MessageManager processor task not running or already stopped.")
async def get_container(self, chat_id: str) -> MessageContainer:
"""获取或创建聊天流的消息容器 (异步,使用锁)"""
@@ -200,18 +200,18 @@ class MessageManager:
chat_stream = message.chat_stream
if not chat_stream:
logger.error("消息缺少 chat_stream无法添加到容器")
return # 或者抛出异常
return # 或者抛出异常
container = await self.get_container(chat_stream.stream_id)
container.add_message(message)
def check_if_sending_message_exist(self, chat_id, thinking_id):
"""检查指定聊天流的容器中是否存在具有特定 thinking_id 的 MessageSending 消息 或 emoji 消息"""
# 这个方法现在是非异步的,因为它只读取数据
container = self.containers.get(chat_id) # 直接 get因为读取不需要锁
container = self.containers.get(chat_id) # 直接 get因为读取不需要锁
if container and container.has_messages():
for message in container.get_all_messages():
if isinstance(message, MessageSending):
msg_id = getattr(message.message_info, 'message_id', None)
msg_id = getattr(message.message_info, "message_id", None)
# 检查 message_id 是否匹配 thinking_id 或以 "me" 开头 (emoji)
if msg_id == thinking_id or (msg_id and msg_id.startswith("me")):
# logger.debug(f"检查到存在相同thinking_id或emoji的消息: {msg_id} for {thinking_id}")
@@ -221,7 +221,7 @@ class MessageManager:
async def _handle_sending_message(self, container: MessageContainer, message: MessageSending):
"""处理单个 MessageSending 消息 (包含 set_reply 逻辑)"""
try:
_ = message.update_thinking_time() # 更新思考时间
_ = message.update_thinking_time() # 更新思考时间
thinking_start_time = message.thinking_start_time
now_time = time.time()
thinking_messages_count, thinking_messages_length = count_messages_between(
@@ -230,16 +230,18 @@ class MessageManager:
# --- 条件应用 set_reply 逻辑 ---
if (
message.apply_set_reply_logic # 检查标记
message.apply_set_reply_logic # 检查标记
and message.is_head
and (thinking_messages_count > 4 or thinking_messages_length > 250)
and not message.is_private_message()
):
logger.debug(f"[{message.chat_stream.stream_id}] 应用 set_reply 逻辑: {message.processed_plain_text[:20]}...")
logger.debug(
f"[{message.chat_stream.stream_id}] 应用 set_reply 逻辑: {message.processed_plain_text[:20]}..."
)
message.set_reply()
# --- 结束条件 set_reply ---
await message.process() # 预处理消息内容
await message.process() # 预处理消息内容
# 使用全局 message_sender 实例
await message_sender.send_message(message)
@@ -250,22 +252,23 @@ class MessageManager:
# logger.debug(f"[{message.chat_stream.stream_id}] Sent and removed message: {message.message_info.message_id}")
except Exception as e:
logger.error(f"[{message.chat_stream.stream_id}] 处理发送消息 {getattr(message.message_info, 'message_id', 'N/A')} 时出错: {e}")
logger.error(
f"[{message.chat_stream.stream_id}] 处理发送消息 {getattr(message.message_info, 'message_id', 'N/A')} 时出错: {e}"
)
logger.exception("详细错误信息:")
# 考虑是否移除出错的消息,防止无限循环
removed = container.remove_message(message)
if removed:
logger.warning(f"[{message.chat_stream.stream_id}] 已移除处理出错的消息。")
logger.warning(f"[{message.chat_stream.stream_id}] 已移除处理出错的消息。")
async def _process_chat_messages(self, chat_id: str):
"""处理单个聊天流消息 (合并后的逻辑)"""
container = await self.get_container(chat_id) # 获取容器是异步的了
container = await self.get_container(chat_id) # 获取容器是异步的了
if container.has_messages():
message_earliest = container.get_earliest_message()
if not message_earliest: # 如果最早消息为空,则退出
if not message_earliest: # 如果最早消息为空,则退出
return
if isinstance(message_earliest, MessageThinking):
@@ -273,7 +276,7 @@ class MessageManager:
message_earliest.update_thinking_time()
thinking_time = message_earliest.thinking_time
# 减少控制台刷新频率或只在时间显著变化时打印
if int(thinking_time) % 5 == 0: # 每5秒打印一次
if int(thinking_time) % 5 == 0: # 每5秒打印一次
print(
f"消息 {message_earliest.message_info.message_id} 正在思考中,已思考 {int(thinking_time)}\r",
end="",
@@ -282,9 +285,11 @@ class MessageManager:
# 检查是否超时
if thinking_time > global_config.thinking_timeout:
logger.warning(f"[{chat_id}] 消息思考超时 ({thinking_time:.1f}秒),移除消息 {message_earliest.message_info.message_id}")
logger.warning(
f"[{chat_id}] 消息思考超时 ({thinking_time:.1f}秒),移除消息 {message_earliest.message_info.message_id}"
)
container.remove_message(message_earliest)
print() # 超时后换行,避免覆盖下一条日志
print() # 超时后换行,避免覆盖下一条日志
elif isinstance(message_earliest, MessageSending):
# --- 处理发送消息 ---
@@ -300,7 +305,7 @@ class MessageManager:
if msg is message_earliest:
continue
logger.info(f"[{chat_id}] 处理超时发送消息: {msg.message_info.message_id}")
await self._handle_sending_message(container, msg) # 复用处理逻辑
await self._handle_sending_message(container, msg) # 复用处理逻辑
# 清理空容器 (可选)
# async with self._container_lock:
@@ -308,7 +313,6 @@ class MessageManager:
# logger.debug(f"[{chat_id}] 容器已空,准备移除。")
# del self.containers[chat_id]
async def _start_processor_loop(self):
"""消息处理器主循环"""
while self._running:
@@ -319,8 +323,8 @@ class MessageManager:
chat_ids = list(self.containers.keys())
for chat_id in chat_ids:
# 为每个 chat_id 创建一个处理任务
tasks.append(asyncio.create_task(self._process_chat_messages(chat_id)))
# 为每个 chat_id 创建一个处理任务
tasks.append(asyncio.create_task(self._process_chat_messages(chat_id)))
if tasks:
try:
@@ -331,13 +335,14 @@ class MessageManager:
# 等待一小段时间避免CPU空转
try:
await asyncio.sleep(0.1) # 稍微降低轮询频率
await asyncio.sleep(0.1) # 稍微降低轮询频率
except asyncio.CancelledError:
logger.info("Processor loop sleep cancelled.")
break # 退出循环
logger.info("Processor loop sleep cancelled.")
break # 退出循环
logger.info("MessageManager processor loop finished.")
# --- 创建全局实例 ---
message_manager = MessageManager()
message_sender = MessageSender()
# --- 结束全局实例 ---
# --- 结束全局实例 ---