refactor(napcat): 改进连接管理和日志级别优化

- 添加MaiBot连接重试机制,支持非阻塞启动和自动重连
- 优化日志输出级别,减少非关键信息的INFO日志
- 增强消息发送的连接状态检查和故障恢复
- 改进调试模式下的原始消息记录逻辑
This commit is contained in:
tt-P607
2025-09-06 22:47:58 +08:00
parent db3c3ebd63
commit 8a99e7fe63
6 changed files with 97 additions and 36 deletions

View File

@@ -40,6 +40,8 @@ async def message_recv(server_connection: Server.ServerConnection):
asyncio.create_task(notice_handler.set_server_connection(server_connection))
await send_handler.set_server_connection(server_connection)
async for raw_message in server_connection:
# 只在debug模式下记录原始消息
if logger.level <= 10: # DEBUG level
logger.debug(f"{raw_message[:1500]}..." if (len(raw_message) > 1500) else raw_message)
decoded_raw_message: dict = json.loads(raw_message)
try:
@@ -225,13 +227,38 @@ class LauchNapcatAdapterHandler(BaseEventHandler):
await reassembler.start_cleanup_task()
logger.info("开始启动Napcat Adapter")
message_send_instance.maibot_router = router
# 创建单独的异步任务,防止阻塞主线程
asyncio.create_task(self._start_maibot_connection())
asyncio.create_task(napcat_server(self.plugin_config))
asyncio.create_task(mmc_start_com(self.plugin_config))
asyncio.create_task(message_process())
asyncio.create_task(check_timeout_response())
async def _start_maibot_connection(self):
"""非阻塞方式启动MaiBot连接等待主服务启动后再连接"""
# 等待一段时间让MaiBot主服务完全启动
await asyncio.sleep(5)
max_attempts = 10
attempt = 0
while attempt < max_attempts:
try:
logger.info(f"尝试连接MaiBot (第{attempt + 1}次)")
await mmc_start_com(self.plugin_config)
message_send_instance.maibot_router = router
logger.info("MaiBot router连接已建立")
return
except Exception as e:
attempt += 1
if attempt >= max_attempts:
logger.error(f"MaiBot连接失败已达到最大重试次数: {e}")
return
else:
delay = min(2 + attempt, 10) # 逐渐增加延迟最大10秒
logger.warning(f"MaiBot连接失败: {e}{delay}秒后重试")
await asyncio.sleep(delay)
class StopNapcatAdapterHandler(BaseEventHandler):
"""关闭Adapter"""

View File

@@ -159,7 +159,7 @@ class SimpleMessageBuffer:
# 检查是否超过最大组件数量
if len(session.messages) >= config_api.get_plugin_config(self.plugin_config, "features.message_buffer_max_components", 5):
logger.info(f"会话 {session_id} 消息数量达到上限,强制合并")
logger.debug(f"会话 {session_id} 消息数量达到上限,强制合并")
asyncio.create_task(self._force_merge_session(session_id))
self.buffer_pool[session_id] = BufferedSession(session_id=session_id, original_event=original_event)
session = self.buffer_pool[session_id]
@@ -240,7 +240,7 @@ class SimpleMessageBuffer:
merged_text = "".join(text_parts) # 使用中文逗号连接
message_count = len(session.messages)
logger.info(f"合并会话 {session_id}{message_count} 条文本消息: {merged_text[:100]}...")
logger.debug(f"合并会话 {session_id}{message_count} 条文本消息: {merged_text[:100]}...")
# 调用回调函数
if self.merge_callback:
@@ -294,13 +294,13 @@ class SimpleMessageBuffer:
expired_sessions.append(session_id)
for session_id in expired_sessions:
logger.info(f"清理过期会话: {session_id}")
logger.debug(f"清理过期会话: {session_id}")
await self._force_merge_session(session_id)
async def shutdown(self):
"""关闭消息缓冲器"""
self._shutdown = True
logger.info("正在关闭简化消息缓冲器...")
logger.debug("正在关闭简化消息缓冲器...")
# 刷新所有缓冲区
await self.flush_all()
@@ -311,4 +311,4 @@ class SimpleMessageBuffer:
await self._cancel_session_timers(session)
self.buffer_pool.clear()
logger.info("简化消息缓冲器已关闭")
logger.debug("简化消息缓冲器已关闭")

View File

@@ -300,7 +300,6 @@ class MessageHandler:
if should_use_buffer:
logger.debug(f"尝试缓冲消息,消息类型: {message_type}, 用户: {user_info.user_id}")
logger.debug(f"原始消息段: {raw_message.get('message', [])}")
# 尝试添加到缓冲器
buffered = await self.message_buffer.add_text_message(
@@ -314,10 +313,10 @@ class MessageHandler:
)
if buffered:
logger.info(f"✅ 文本消息已成功缓冲: {user_info.user_id}")
logger.debug(f"✅ 文本消息已成功缓冲: {user_info.user_id}")
return None # 缓冲成功,不立即发送
# 如果缓冲失败(消息包含非文本元素),走正常处理流程
logger.info(f"❌ 消息缓冲失败,包含非文本元素,走正常处理流程: {user_info.user_id}")
logger.debug(f"❌ 消息缓冲失败,包含非文本元素,走正常处理流程: {user_info.user_id}")
# 缓冲失败时继续执行后面的正常处理流程,不要直接返回
logger.debug(f"准备发送消息到MaiBot消息段数量: {len(seg_message)}")
@@ -335,7 +334,7 @@ class MessageHandler:
raw_message=raw_message.get("raw_message"),
)
logger.info("发送到Maibot处理信息")
logger.debug("发送到Maibot处理信息")
await message_send_instance.message_send(message_base)
async def handle_real_message(self, raw_message: dict, in_reply: bool = False) -> List[Seg] | None:
@@ -530,9 +529,7 @@ class MessageHandler:
message_data: dict = raw_message.get("data")
image_sub_type = message_data.get("sub_type")
try:
logger.debug(f"开始下载图片: {message_data.get('url')}")
image_base64 = await get_image_base64(message_data.get("url"))
logger.debug(f"图片下载成功,大小: {len(image_base64)} 字符")
except Exception as e:
logger.error(f"图片消息处理失败: {str(e)}")
return None
@@ -623,8 +620,8 @@ class MessageHandler:
video_url = message_data.get("url")
file_path = message_data.get("filePath") or message_data.get("file_path")
logger.info(f"视频URL: {video_url}")
logger.info(f"视频文件路径: {file_path}")
logger.debug(f"视频URL: {video_url}")
logger.debug(f"视频文件路径: {file_path}")
# 优先使用本地文件路径其次使用URL
video_source = file_path if file_path else video_url
@@ -637,14 +634,14 @@ class MessageHandler:
try:
# 检查是否为本地文件路径
if file_path and Path(file_path).exists():
logger.info(f"使用本地视频文件: {file_path}")
logger.debug(f"使用本地视频文件: {file_path}")
# 直接读取本地文件
with open(file_path, "rb") as f:
video_data = f.read()
# 将视频数据编码为base64用于传输
video_base64 = base64.b64encode(video_data).decode("utf-8")
logger.info(f"视频文件大小: {len(video_data) / (1024 * 1024):.2f} MB")
logger.debug(f"视频文件大小: {len(video_data) / (1024 * 1024):.2f} MB")
# 返回包含详细信息的字典格式
return Seg(
@@ -657,7 +654,7 @@ class MessageHandler:
)
elif video_url:
logger.info(f"使用视频URL下载: {video_url}")
logger.debug(f"使用视频URL下载: {video_url}")
# 使用video_handler下载视频
video_downloader = get_video_downloader()
download_result = await video_downloader.download_video(video_url)
@@ -669,7 +666,7 @@ class MessageHandler:
# 将视频数据编码为base64用于传输
video_base64 = base64.b64encode(download_result["data"]).decode("utf-8")
logger.info(f"视频下载成功,大小: {len(download_result['data']) / (1024 * 1024):.2f} MB")
logger.debug(f"视频下载成功,大小: {len(download_result['data']) / (1024 * 1024):.2f} MB")
# 返回包含详细信息的字典格式
return Seg(
@@ -738,15 +735,15 @@ class MessageHandler:
processed_message: Seg
if image_count < 5 and image_count > 0:
# 处理图片数量小于5的情况此时解析图片为base64
logger.info("图片数量小于5开始解析图片为base64")
logger.debug("图片数量小于5开始解析图片为base64")
processed_message = await self._recursive_parse_image_seg(handled_message, True)
elif image_count > 0:
logger.info("图片数量大于等于5开始解析图片为占位符")
logger.debug("图片数量大于等于5开始解析图片为占位符")
# 处理图片数量大于等于5的情况此时解析图片为占位符
processed_message = await self._recursive_parse_image_seg(handled_message, False)
else:
# 处理没有图片的情况,此时直接返回
logger.info("没有图片,直接返回")
logger.debug("没有图片,直接返回")
processed_message = handled_message
# 添加转发消息提示
@@ -880,7 +877,7 @@ class MessageHandler:
return Seg(type="text", data="[表情包]")
return Seg(type="emoji", data=encoded_image)
else:
logger.info(f"不处理类型: {seg_data.type}")
logger.debug(f"不处理类型: {seg_data.type}")
return seg_data
else:
if seg_data.type == "seglist":
@@ -894,7 +891,7 @@ class MessageHandler:
elif seg_data.type == "emoji":
return Seg(type="text", data="[动画表情]")
else:
logger.info(f"不处理类型: {seg_data.type}")
logger.debug(f"不处理类型: {seg_data.type}")
return seg_data
async def _handle_forward_message(self, message_list: list, layer: int) -> Tuple[Seg, int] | Tuple[None, int]:
@@ -1069,7 +1066,7 @@ class MessageHandler:
raw_message=raw_message.get("raw_message", ""),
)
logger.info(f"发送缓冲合并消息到Maibot处理: {session_id}")
logger.debug(f"发送缓冲合并消息到Maibot处理: {session_id}")
await message_send_instance.message_send(message_base)
except Exception as e:

View File

@@ -15,6 +15,8 @@ class MessageSending:
maibot_router: Router = None
plugin_config = None
_connection_retries = 0
_max_retries = 3
def __init__(self):
pass
@@ -23,6 +25,25 @@ class MessageSending:
"""设置插件配置"""
self.plugin_config = plugin_config
async def _attempt_reconnect(self):
"""尝试重新连接MaiBot router"""
if self._connection_retries < self._max_retries:
self._connection_retries += 1
logger.warning(f"尝试重新连接MaiBot router (第{self._connection_retries}次)")
try:
# 重新导入router
from ..mmc_com_layer import router
self.maibot_router = router
if self.maibot_router is not None:
logger.info("MaiBot router重连成功")
self._connection_retries = 0 # 重置重试计数
return True
except Exception as e:
logger.error(f"重连失败: {e}")
else:
logger.error(f"已达到最大重连次数({self._max_retries}),停止重试")
return False
async def message_send(self, message_base: MessageBase) -> bool:
"""
发送消息Ada -> MMC 方向,需要实现切片)
@@ -30,6 +51,13 @@ class MessageSending:
message_base: MessageBase: 消息基类,包含发送目标和消息内容等信息
"""
try:
# 检查maibot_router是否已初始化
if self.maibot_router is None:
logger.warning("MaiBot router未初始化尝试重新连接")
if not await self._attempt_reconnect():
logger.error("MaiBot router重连失败无法发送消息")
logger.error("请检查与MaiBot之间的连接")
return False
# 检查是否需要切片发送
message_dict = message_base.to_dict()
@@ -45,6 +73,14 @@ class MessageSending:
# 获取对应的客户端并发送切片
platform = message_base.message_info.platform
# 再次检查router状态防止运行时被重置
if self.maibot_router is None or not hasattr(self.maibot_router, 'clients'):
logger.warning("MaiBot router连接已断开尝试重新连接")
if not await self._attempt_reconnect():
logger.error("MaiBot router重连失败切片发送中止")
return False
if platform not in self.maibot_router.clients:
logger.error(f"平台 {platform} 未连接")
return False

View File

@@ -119,7 +119,7 @@ class NoticeHandler:
if config_api.get_plugin_config(self.plugin_config, "features.poke_enabled", True) and await message_handler.check_allow_to_chat(
user_id, group_id, False, False
):
logger.info("处理戳一戳消息")
logger.debug("处理戳一戳消息")
handled_message, user_info = await self.handle_poke_notify(raw_message, group_id, user_id)
else:
logger.warning("戳一戳消息被禁用,取消戳一戳处理")
@@ -191,7 +191,7 @@ class NoticeHandler:
if system_notice:
await self.put_notice(message_base)
else:
logger.info("发送到Maibot处理通知信息")
logger.debug("发送到Maibot处理通知信息")
await message_send_instance.message_send(message_base)
async def handle_poke_notify(
@@ -215,7 +215,7 @@ class NoticeHandler:
if self.last_poke_time > 0:
time_diff = current_time - self.last_poke_time
if time_diff < debounce_seconds:
logger.info(f"戳一戳防抖:用户 {user_id} 的戳一戳被忽略(距离上次戳一戳 {time_diff:.2f} 秒)")
logger.debug(f"戳一戳防抖:用户 {user_id} 的戳一戳被忽略(距离上次戳一戳 {time_diff:.2f} 秒)")
return None, None
# 记录这次戳一戳的时间
@@ -234,7 +234,7 @@ class NoticeHandler:
else:
user_name = "QQ用户"
user_cardname = "QQ用户"
logger.info("无法获取戳一戳对方的用户昵称")
logger.debug("无法获取戳一戳对方的用户昵称")
# 计算Seg
if self_id == target_id:
@@ -248,7 +248,7 @@ class NoticeHandler:
else:
# 如果配置为忽略不是针对自己的戳一戳则直接返回None
if config_api.get_plugin_config(self.plugin_config, "features.non_self_poke_ignored", False):
logger.info("忽略不是针对自己的戳一戳消息")
logger.debug("忽略不是针对自己的戳一戳消息")
return None, None
# 老实说这一步判定没啥意义,毕竟私聊是没有其他人之间的戳一戳,但是感觉可以有这个判定来强限制群聊环境
@@ -258,7 +258,7 @@ class NoticeHandler:
target_name = fetched_member_info.get("nickname")
else:
target_name = "QQ用户"
logger.info("无法获取被戳一戳方的用户昵称")
logger.debug("无法获取被戳一戳方的用户昵称")
display_name = user_name
else:
return None, None
@@ -521,7 +521,7 @@ class NoticeHandler:
continue
if ban_record.lift_time <= int(time.time()):
# 触发自然解除禁言
logger.info(f"检测到用户 {ban_record.user_id} 在群 {ban_record.group_id} 的禁言已解除")
logger.debug(f"检测到用户 {ban_record.user_id} 在群 {ban_record.group_id} 的禁言已解除")
self.lifted_list.append(ban_record)
self.banned_list.remove(ban_record)
await asyncio.sleep(5)

View File

@@ -20,7 +20,7 @@ def set_plugin_config(config: dict):
async def get_response(request_id: str, timeout: int = 10) -> dict:
response = await asyncio.wait_for(_get_response(request_id), timeout)
_ = response_time_dict.pop(request_id)
logger.info(f"响应信息id: {request_id} 已从响应字典中取出")
logger.debug(f"响应信息id: {request_id} 已从响应字典中取出")
return response
@@ -38,7 +38,7 @@ async def put_response(response: dict):
now_time = time.time()
response_dict[echo_id] = response
response_time_dict[echo_id] = now_time
logger.info(f"响应信息id: {echo_id} 已存入响应字典")
logger.debug(f"响应信息id: {echo_id} 已存入响应字典")
async def check_timeout_response() -> None:
@@ -57,5 +57,6 @@ async def check_timeout_response() -> None:
response_dict.pop(echo_id)
response_time_dict.pop(echo_id)
logger.warning(f"响应消息 {echo_id} 超时,已删除")
if cleaned_message_count > 0:
logger.info(f"已删除 {cleaned_message_count} 条超时响应消息")
await asyncio.sleep(heartbeat_interval)