This commit is contained in:
Furina-1013-create
2025-09-06 23:00:06 +08:00
6 changed files with 97 additions and 36 deletions

View File

@@ -40,6 +40,8 @@ async def message_recv(server_connection: Server.ServerConnection):
asyncio.create_task(notice_handler.set_server_connection(server_connection)) asyncio.create_task(notice_handler.set_server_connection(server_connection))
await send_handler.set_server_connection(server_connection) await send_handler.set_server_connection(server_connection)
async for raw_message in server_connection: async for raw_message in server_connection:
# 只在debug模式下记录原始消息
if logger.level <= 10: # DEBUG level
logger.debug(f"{raw_message[:1500]}..." if (len(raw_message) > 1500) else raw_message) logger.debug(f"{raw_message[:1500]}..." if (len(raw_message) > 1500) else raw_message)
decoded_raw_message: dict = json.loads(raw_message) decoded_raw_message: dict = json.loads(raw_message)
try: try:
@@ -225,13 +227,38 @@ class LauchNapcatAdapterHandler(BaseEventHandler):
await reassembler.start_cleanup_task() await reassembler.start_cleanup_task()
logger.info("开始启动Napcat Adapter") logger.info("开始启动Napcat Adapter")
message_send_instance.maibot_router = router
# 创建单独的异步任务,防止阻塞主线程 # 创建单独的异步任务,防止阻塞主线程
asyncio.create_task(self._start_maibot_connection())
asyncio.create_task(napcat_server(self.plugin_config)) asyncio.create_task(napcat_server(self.plugin_config))
asyncio.create_task(mmc_start_com(self.plugin_config))
asyncio.create_task(message_process()) asyncio.create_task(message_process())
asyncio.create_task(check_timeout_response()) asyncio.create_task(check_timeout_response())
async def _start_maibot_connection(self):
"""非阻塞方式启动MaiBot连接等待主服务启动后再连接"""
# 等待一段时间让MaiBot主服务完全启动
await asyncio.sleep(5)
max_attempts = 10
attempt = 0
while attempt < max_attempts:
try:
logger.info(f"尝试连接MaiBot (第{attempt + 1}次)")
await mmc_start_com(self.plugin_config)
message_send_instance.maibot_router = router
logger.info("MaiBot router连接已建立")
return
except Exception as e:
attempt += 1
if attempt >= max_attempts:
logger.error(f"MaiBot连接失败已达到最大重试次数: {e}")
return
else:
delay = min(2 + attempt, 10) # 逐渐增加延迟最大10秒
logger.warning(f"MaiBot连接失败: {e}{delay}秒后重试")
await asyncio.sleep(delay)
class StopNapcatAdapterHandler(BaseEventHandler): class StopNapcatAdapterHandler(BaseEventHandler):
"""关闭Adapter""" """关闭Adapter"""

View File

@@ -159,7 +159,7 @@ class SimpleMessageBuffer:
# 检查是否超过最大组件数量 # 检查是否超过最大组件数量
if len(session.messages) >= config_api.get_plugin_config(self.plugin_config, "features.message_buffer_max_components", 5): if len(session.messages) >= config_api.get_plugin_config(self.plugin_config, "features.message_buffer_max_components", 5):
logger.info(f"会话 {session_id} 消息数量达到上限,强制合并") logger.debug(f"会话 {session_id} 消息数量达到上限,强制合并")
asyncio.create_task(self._force_merge_session(session_id)) asyncio.create_task(self._force_merge_session(session_id))
self.buffer_pool[session_id] = BufferedSession(session_id=session_id, original_event=original_event) self.buffer_pool[session_id] = BufferedSession(session_id=session_id, original_event=original_event)
session = self.buffer_pool[session_id] session = self.buffer_pool[session_id]
@@ -240,7 +240,7 @@ class SimpleMessageBuffer:
merged_text = "".join(text_parts) # 使用中文逗号连接 merged_text = "".join(text_parts) # 使用中文逗号连接
message_count = len(session.messages) message_count = len(session.messages)
logger.info(f"合并会话 {session_id}{message_count} 条文本消息: {merged_text[:100]}...") logger.debug(f"合并会话 {session_id}{message_count} 条文本消息: {merged_text[:100]}...")
# 调用回调函数 # 调用回调函数
if self.merge_callback: if self.merge_callback:
@@ -294,13 +294,13 @@ class SimpleMessageBuffer:
expired_sessions.append(session_id) expired_sessions.append(session_id)
for session_id in expired_sessions: for session_id in expired_sessions:
logger.info(f"清理过期会话: {session_id}") logger.debug(f"清理过期会话: {session_id}")
await self._force_merge_session(session_id) await self._force_merge_session(session_id)
async def shutdown(self): async def shutdown(self):
"""关闭消息缓冲器""" """关闭消息缓冲器"""
self._shutdown = True self._shutdown = True
logger.info("正在关闭简化消息缓冲器...") logger.debug("正在关闭简化消息缓冲器...")
# 刷新所有缓冲区 # 刷新所有缓冲区
await self.flush_all() await self.flush_all()
@@ -311,4 +311,4 @@ class SimpleMessageBuffer:
await self._cancel_session_timers(session) await self._cancel_session_timers(session)
self.buffer_pool.clear() self.buffer_pool.clear()
logger.info("简化消息缓冲器已关闭") logger.debug("简化消息缓冲器已关闭")

View File

@@ -315,7 +315,6 @@ class MessageHandler:
if should_use_buffer: if should_use_buffer:
logger.debug(f"尝试缓冲消息,消息类型: {message_type}, 用户: {user_info.user_id}") logger.debug(f"尝试缓冲消息,消息类型: {message_type}, 用户: {user_info.user_id}")
logger.debug(f"原始消息段: {raw_message.get('message', [])}")
# 尝试添加到缓冲器 # 尝试添加到缓冲器
buffered = await self.message_buffer.add_text_message( buffered = await self.message_buffer.add_text_message(
@@ -329,10 +328,10 @@ class MessageHandler:
) )
if buffered: if buffered:
logger.info(f"✅ 文本消息已成功缓冲: {user_info.user_id}") logger.debug(f"✅ 文本消息已成功缓冲: {user_info.user_id}")
return None # 缓冲成功,不立即发送 return None # 缓冲成功,不立即发送
# 如果缓冲失败(消息包含非文本元素),走正常处理流程 # 如果缓冲失败(消息包含非文本元素),走正常处理流程
logger.info(f"❌ 消息缓冲失败,包含非文本元素,走正常处理流程: {user_info.user_id}") logger.debug(f"❌ 消息缓冲失败,包含非文本元素,走正常处理流程: {user_info.user_id}")
# 缓冲失败时继续执行后面的正常处理流程,不要直接返回 # 缓冲失败时继续执行后面的正常处理流程,不要直接返回
logger.debug(f"准备发送消息到MaiBot消息段数量: {len(seg_message)}") logger.debug(f"准备发送消息到MaiBot消息段数量: {len(seg_message)}")
@@ -350,7 +349,7 @@ class MessageHandler:
raw_message=raw_message.get("raw_message"), raw_message=raw_message.get("raw_message"),
) )
logger.info("发送到Maibot处理信息") logger.debug("发送到Maibot处理信息")
await message_send_instance.message_send(message_base) await message_send_instance.message_send(message_base)
async def handle_real_message(self, raw_message: dict, in_reply: bool = False) -> List[Seg] | None: async def handle_real_message(self, raw_message: dict, in_reply: bool = False) -> List[Seg] | None:
@@ -559,9 +558,7 @@ class MessageHandler:
message_data: dict = raw_message.get("data") message_data: dict = raw_message.get("data")
image_sub_type = message_data.get("sub_type") image_sub_type = message_data.get("sub_type")
try: try:
logger.debug(f"开始下载图片: {message_data.get('url')}")
image_base64 = await get_image_base64(message_data.get("url")) image_base64 = await get_image_base64(message_data.get("url"))
logger.debug(f"图片下载成功,大小: {len(image_base64)} 字符")
except Exception as e: except Exception as e:
logger.error(f"图片消息处理失败: {str(e)}") logger.error(f"图片消息处理失败: {str(e)}")
return None return None
@@ -652,8 +649,8 @@ class MessageHandler:
video_url = message_data.get("url") video_url = message_data.get("url")
file_path = message_data.get("filePath") or message_data.get("file_path") file_path = message_data.get("filePath") or message_data.get("file_path")
logger.info(f"视频URL: {video_url}") logger.debug(f"视频URL: {video_url}")
logger.info(f"视频文件路径: {file_path}") logger.debug(f"视频文件路径: {file_path}")
# 优先使用本地文件路径其次使用URL # 优先使用本地文件路径其次使用URL
video_source = file_path if file_path else video_url video_source = file_path if file_path else video_url
@@ -666,14 +663,14 @@ class MessageHandler:
try: try:
# 检查是否为本地文件路径 # 检查是否为本地文件路径
if file_path and Path(file_path).exists(): if file_path and Path(file_path).exists():
logger.info(f"使用本地视频文件: {file_path}") logger.debug(f"使用本地视频文件: {file_path}")
# 直接读取本地文件 # 直接读取本地文件
with open(file_path, "rb") as f: with open(file_path, "rb") as f:
video_data = f.read() video_data = f.read()
# 将视频数据编码为base64用于传输 # 将视频数据编码为base64用于传输
video_base64 = base64.b64encode(video_data).decode("utf-8") video_base64 = base64.b64encode(video_data).decode("utf-8")
logger.info(f"视频文件大小: {len(video_data) / (1024 * 1024):.2f} MB") logger.debug(f"视频文件大小: {len(video_data) / (1024 * 1024):.2f} MB")
# 返回包含详细信息的字典格式 # 返回包含详细信息的字典格式
return Seg( return Seg(
@@ -686,7 +683,7 @@ class MessageHandler:
) )
elif video_url: elif video_url:
logger.info(f"使用视频URL下载: {video_url}") logger.debug(f"使用视频URL下载: {video_url}")
# 使用video_handler下载视频 # 使用video_handler下载视频
video_downloader = get_video_downloader() video_downloader = get_video_downloader()
download_result = await video_downloader.download_video(video_url) download_result = await video_downloader.download_video(video_url)
@@ -698,7 +695,7 @@ class MessageHandler:
# 将视频数据编码为base64用于传输 # 将视频数据编码为base64用于传输
video_base64 = base64.b64encode(download_result["data"]).decode("utf-8") video_base64 = base64.b64encode(download_result["data"]).decode("utf-8")
logger.info(f"视频下载成功,大小: {len(download_result['data']) / (1024 * 1024):.2f} MB") logger.debug(f"视频下载成功,大小: {len(download_result['data']) / (1024 * 1024):.2f} MB")
# 返回包含详细信息的字典格式 # 返回包含详细信息的字典格式
return Seg( return Seg(
@@ -767,15 +764,15 @@ class MessageHandler:
processed_message: Seg processed_message: Seg
if image_count < 5 and image_count > 0: if image_count < 5 and image_count > 0:
# 处理图片数量小于5的情况此时解析图片为base64 # 处理图片数量小于5的情况此时解析图片为base64
logger.info("图片数量小于5开始解析图片为base64") logger.debug("图片数量小于5开始解析图片为base64")
processed_message = await self._recursive_parse_image_seg(handled_message, True) processed_message = await self._recursive_parse_image_seg(handled_message, True)
elif image_count > 0: elif image_count > 0:
logger.info("图片数量大于等于5开始解析图片为占位符") logger.debug("图片数量大于等于5开始解析图片为占位符")
# 处理图片数量大于等于5的情况此时解析图片为占位符 # 处理图片数量大于等于5的情况此时解析图片为占位符
processed_message = await self._recursive_parse_image_seg(handled_message, False) processed_message = await self._recursive_parse_image_seg(handled_message, False)
else: else:
# 处理没有图片的情况,此时直接返回 # 处理没有图片的情况,此时直接返回
logger.info("没有图片,直接返回") logger.debug("没有图片,直接返回")
processed_message = handled_message processed_message = handled_message
# 添加转发消息提示 # 添加转发消息提示
@@ -909,7 +906,7 @@ class MessageHandler:
return Seg(type="text", data="[表情包]") return Seg(type="text", data="[表情包]")
return Seg(type="emoji", data=encoded_image) return Seg(type="emoji", data=encoded_image)
else: else:
logger.info(f"不处理类型: {seg_data.type}") logger.debug(f"不处理类型: {seg_data.type}")
return seg_data return seg_data
else: else:
if seg_data.type == "seglist": if seg_data.type == "seglist":
@@ -923,7 +920,7 @@ class MessageHandler:
elif seg_data.type == "emoji": elif seg_data.type == "emoji":
return Seg(type="text", data="[动画表情]") return Seg(type="text", data="[动画表情]")
else: else:
logger.info(f"不处理类型: {seg_data.type}") logger.debug(f"不处理类型: {seg_data.type}")
return seg_data return seg_data
async def _handle_forward_message(self, message_list: list, layer: int) -> Tuple[Seg, int] | Tuple[None, int]: async def _handle_forward_message(self, message_list: list, layer: int) -> Tuple[Seg, int] | Tuple[None, int]:
@@ -1098,7 +1095,7 @@ class MessageHandler:
raw_message=raw_message.get("raw_message", ""), raw_message=raw_message.get("raw_message", ""),
) )
logger.info(f"发送缓冲合并消息到Maibot处理: {session_id}") logger.debug(f"发送缓冲合并消息到Maibot处理: {session_id}")
await message_send_instance.message_send(message_base) await message_send_instance.message_send(message_base)
except Exception as e: except Exception as e:

View File

@@ -15,6 +15,8 @@ class MessageSending:
maibot_router: Router = None maibot_router: Router = None
plugin_config = None plugin_config = None
_connection_retries = 0
_max_retries = 3
def __init__(self): def __init__(self):
pass pass
@@ -23,6 +25,25 @@ class MessageSending:
"""设置插件配置""" """设置插件配置"""
self.plugin_config = plugin_config self.plugin_config = plugin_config
async def _attempt_reconnect(self):
"""尝试重新连接MaiBot router"""
if self._connection_retries < self._max_retries:
self._connection_retries += 1
logger.warning(f"尝试重新连接MaiBot router (第{self._connection_retries}次)")
try:
# 重新导入router
from ..mmc_com_layer import router
self.maibot_router = router
if self.maibot_router is not None:
logger.info("MaiBot router重连成功")
self._connection_retries = 0 # 重置重试计数
return True
except Exception as e:
logger.error(f"重连失败: {e}")
else:
logger.error(f"已达到最大重连次数({self._max_retries}),停止重试")
return False
async def message_send(self, message_base: MessageBase) -> bool: async def message_send(self, message_base: MessageBase) -> bool:
""" """
发送消息Ada -> MMC 方向,需要实现切片) 发送消息Ada -> MMC 方向,需要实现切片)
@@ -30,6 +51,13 @@ class MessageSending:
message_base: MessageBase: 消息基类,包含发送目标和消息内容等信息 message_base: MessageBase: 消息基类,包含发送目标和消息内容等信息
""" """
try: try:
# 检查maibot_router是否已初始化
if self.maibot_router is None:
logger.warning("MaiBot router未初始化尝试重新连接")
if not await self._attempt_reconnect():
logger.error("MaiBot router重连失败无法发送消息")
logger.error("请检查与MaiBot之间的连接")
return False
# 检查是否需要切片发送 # 检查是否需要切片发送
message_dict = message_base.to_dict() message_dict = message_base.to_dict()
@@ -45,6 +73,14 @@ class MessageSending:
# 获取对应的客户端并发送切片 # 获取对应的客户端并发送切片
platform = message_base.message_info.platform platform = message_base.message_info.platform
# 再次检查router状态防止运行时被重置
if self.maibot_router is None or not hasattr(self.maibot_router, 'clients'):
logger.warning("MaiBot router连接已断开尝试重新连接")
if not await self._attempt_reconnect():
logger.error("MaiBot router重连失败切片发送中止")
return False
if platform not in self.maibot_router.clients: if platform not in self.maibot_router.clients:
logger.error(f"平台 {platform} 未连接") logger.error(f"平台 {platform} 未连接")
return False return False

View File

@@ -119,7 +119,7 @@ class NoticeHandler:
if config_api.get_plugin_config(self.plugin_config, "features.poke_enabled", True) and await message_handler.check_allow_to_chat( if config_api.get_plugin_config(self.plugin_config, "features.poke_enabled", True) and await message_handler.check_allow_to_chat(
user_id, group_id, False, False user_id, group_id, False, False
): ):
logger.info("处理戳一戳消息") logger.debug("处理戳一戳消息")
handled_message, user_info = await self.handle_poke_notify(raw_message, group_id, user_id) handled_message, user_info = await self.handle_poke_notify(raw_message, group_id, user_id)
else: else:
logger.warning("戳一戳消息被禁用,取消戳一戳处理") logger.warning("戳一戳消息被禁用,取消戳一戳处理")
@@ -191,7 +191,7 @@ class NoticeHandler:
if system_notice: if system_notice:
await self.put_notice(message_base) await self.put_notice(message_base)
else: else:
logger.info("发送到Maibot处理通知信息") logger.debug("发送到Maibot处理通知信息")
await message_send_instance.message_send(message_base) await message_send_instance.message_send(message_base)
async def handle_poke_notify( async def handle_poke_notify(
@@ -215,7 +215,7 @@ class NoticeHandler:
if self.last_poke_time > 0: if self.last_poke_time > 0:
time_diff = current_time - self.last_poke_time time_diff = current_time - self.last_poke_time
if time_diff < debounce_seconds: if time_diff < debounce_seconds:
logger.info(f"戳一戳防抖:用户 {user_id} 的戳一戳被忽略(距离上次戳一戳 {time_diff:.2f} 秒)") logger.debug(f"戳一戳防抖:用户 {user_id} 的戳一戳被忽略(距离上次戳一戳 {time_diff:.2f} 秒)")
return None, None return None, None
# 记录这次戳一戳的时间 # 记录这次戳一戳的时间
@@ -234,7 +234,7 @@ class NoticeHandler:
else: else:
user_name = "QQ用户" user_name = "QQ用户"
user_cardname = "QQ用户" user_cardname = "QQ用户"
logger.info("无法获取戳一戳对方的用户昵称") logger.debug("无法获取戳一戳对方的用户昵称")
# 计算Seg # 计算Seg
if self_id == target_id: if self_id == target_id:
@@ -248,7 +248,7 @@ class NoticeHandler:
else: else:
# 如果配置为忽略不是针对自己的戳一戳则直接返回None # 如果配置为忽略不是针对自己的戳一戳则直接返回None
if config_api.get_plugin_config(self.plugin_config, "features.non_self_poke_ignored", False): if config_api.get_plugin_config(self.plugin_config, "features.non_self_poke_ignored", False):
logger.info("忽略不是针对自己的戳一戳消息") logger.debug("忽略不是针对自己的戳一戳消息")
return None, None return None, None
# 老实说这一步判定没啥意义,毕竟私聊是没有其他人之间的戳一戳,但是感觉可以有这个判定来强限制群聊环境 # 老实说这一步判定没啥意义,毕竟私聊是没有其他人之间的戳一戳,但是感觉可以有这个判定来强限制群聊环境
@@ -258,7 +258,7 @@ class NoticeHandler:
target_name = fetched_member_info.get("nickname") target_name = fetched_member_info.get("nickname")
else: else:
target_name = "QQ用户" target_name = "QQ用户"
logger.info("无法获取被戳一戳方的用户昵称") logger.debug("无法获取被戳一戳方的用户昵称")
display_name = user_name display_name = user_name
else: else:
return None, None return None, None
@@ -521,7 +521,7 @@ class NoticeHandler:
continue continue
if ban_record.lift_time <= int(time.time()): if ban_record.lift_time <= int(time.time()):
# 触发自然解除禁言 # 触发自然解除禁言
logger.info(f"检测到用户 {ban_record.user_id} 在群 {ban_record.group_id} 的禁言已解除") logger.debug(f"检测到用户 {ban_record.user_id} 在群 {ban_record.group_id} 的禁言已解除")
self.lifted_list.append(ban_record) self.lifted_list.append(ban_record)
self.banned_list.remove(ban_record) self.banned_list.remove(ban_record)
await asyncio.sleep(5) await asyncio.sleep(5)

View File

@@ -20,7 +20,7 @@ def set_plugin_config(config: dict):
async def get_response(request_id: str, timeout: int = 10) -> dict: async def get_response(request_id: str, timeout: int = 10) -> dict:
response = await asyncio.wait_for(_get_response(request_id), timeout) response = await asyncio.wait_for(_get_response(request_id), timeout)
_ = response_time_dict.pop(request_id) _ = response_time_dict.pop(request_id)
logger.info(f"响应信息id: {request_id} 已从响应字典中取出") logger.debug(f"响应信息id: {request_id} 已从响应字典中取出")
return response return response
@@ -38,7 +38,7 @@ async def put_response(response: dict):
now_time = time.time() now_time = time.time()
response_dict[echo_id] = response response_dict[echo_id] = response
response_time_dict[echo_id] = now_time response_time_dict[echo_id] = now_time
logger.info(f"响应信息id: {echo_id} 已存入响应字典") logger.debug(f"响应信息id: {echo_id} 已存入响应字典")
async def check_timeout_response() -> None: async def check_timeout_response() -> None:
@@ -57,5 +57,6 @@ async def check_timeout_response() -> None:
response_dict.pop(echo_id) response_dict.pop(echo_id)
response_time_dict.pop(echo_id) response_time_dict.pop(echo_id)
logger.warning(f"响应消息 {echo_id} 超时,已删除") logger.warning(f"响应消息 {echo_id} 超时,已删除")
if cleaned_message_count > 0:
logger.info(f"已删除 {cleaned_message_count} 条超时响应消息") logger.info(f"已删除 {cleaned_message_count} 条超时响应消息")
await asyncio.sleep(heartbeat_interval) await asyncio.sleep(heartbeat_interval)