chore: format code and remove redundant blank lines

This commit applies automated code formatting across the project. The changes primarily involve removing unnecessary blank lines and ensuring consistent code style, improving readability and maintainability without altering functionality.
This commit is contained in:
minecraft1024a
2025-09-05 20:58:03 +08:00
parent 488e959577
commit 513757a8ee
37 changed files with 439 additions and 419 deletions

View File

@@ -37,6 +37,7 @@ def get_classes_in_module(module):
classes.append(member)
return classes
async def message_recv(server_connection: Server.ServerConnection):
await message_handler.set_server_connection(server_connection)
asyncio.create_task(notice_handler.set_server_connection(server_connection))
@@ -47,7 +48,7 @@ async def message_recv(server_connection: Server.ServerConnection):
try:
# 首先尝试解析原始消息
decoded_raw_message: dict = json.loads(raw_message)
# 检查是否是切片消息 (来自 MMC)
if chunker.is_chunk_message(decoded_raw_message):
logger.debug("接收到切片消息,尝试重组")
@@ -61,14 +62,14 @@ async def message_recv(server_connection: Server.ServerConnection):
# 切片尚未完整,继续等待更多切片
logger.debug("等待更多切片...")
continue
# 处理完整消息(可能是重组后的,也可能是原本就完整的)
post_type = decoded_raw_message.get("post_type")
if post_type in ["meta_event", "message", "notice"]:
await message_queue.put(decoded_raw_message)
elif post_type is None:
await put_response(decoded_raw_message)
except json.JSONDecodeError as e:
logger.error(f"消息解析失败: {e}")
logger.debug(f"原始消息: {raw_message[:500]}...")
@@ -76,6 +77,7 @@ async def message_recv(server_connection: Server.ServerConnection):
logger.error(f"处理消息时出错: {e}")
logger.debug(f"原始消息: {raw_message[:500]}...")
async def message_process():
"""消息处理主循环"""
logger.info("消息处理器已启动")
@@ -84,7 +86,7 @@ async def message_process():
try:
# 使用超时等待,以便能够响应取消请求
message = await asyncio.wait_for(message_queue.get(), timeout=1.0)
post_type = message.get("post_type")
if post_type == "message":
await message_handler.handle_raw_message(message)
@@ -94,10 +96,10 @@ async def message_process():
await notice_handler.handle_notice(message)
else:
logger.warning(f"未知的post_type: {post_type}")
message_queue.task_done()
await asyncio.sleep(0.05)
except asyncio.TimeoutError:
# 超时是正常的,继续循环
continue
@@ -112,7 +114,7 @@ async def message_process():
except ValueError:
pass
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.info("消息处理器已停止")
raise
@@ -132,6 +134,7 @@ async def message_process():
except Exception as e:
logger.debug(f"清理消息队列时出错: {e}")
async def napcat_server():
"""启动 Napcat WebSocket 连接(支持正向和反向连接)"""
mode = global_config.napcat_server.mode
@@ -143,63 +146,61 @@ async def napcat_server():
logger.error(f"启动 WebSocket 连接失败: {e}")
raise
async def graceful_shutdown():
"""优雅关闭所有组件"""
try:
logger.info("正在关闭adapter...")
# 停止消息重组器的清理任务
try:
await reassembler.stop_cleanup_task()
except Exception as e:
logger.warning(f"停止消息重组器清理任务时出错: {e}")
# 停止功能管理器文件监控
try:
await features_manager.stop_file_watcher()
except Exception as e:
logger.warning(f"停止功能管理器文件监控时出错: {e}")
# 关闭消息处理器(包括消息缓冲器)
try:
await message_handler.shutdown()
except Exception as e:
logger.warning(f"关闭消息处理器时出错: {e}")
# 关闭 WebSocket 连接
try:
await websocket_manager.stop_connection()
except Exception as e:
logger.warning(f"关闭WebSocket连接时出错: {e}")
# 关闭 MaiBot 连接
try:
await mmc_stop_com()
except Exception as e:
logger.warning(f"关闭MaiBot连接时出错: {e}")
# 取消所有剩余任务
current_task = asyncio.current_task()
tasks = [t for t in asyncio.all_tasks() if t is not current_task and not t.done()]
if tasks:
logger.info(f"正在取消 {len(tasks)} 个剩余任务...")
for task in tasks:
task.cancel()
# 等待任务取消完成,忽略 CancelledError
try:
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=10
)
await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=10)
except asyncio.TimeoutError:
logger.warning("部分任务取消超时")
except Exception as e:
logger.debug(f"任务取消过程中的异常(可忽略): {e}")
logger.info("Adapter已成功关闭")
except Exception as e:
logger.error(f"Adapter关闭中出现错误: {e}")
finally:
@@ -214,6 +215,7 @@ async def graceful_shutdown():
except Exception:
pass
class LauchNapcatAdapterHandler(BaseEventHandler):
"""自动启动Adapter"""
@@ -245,6 +247,7 @@ class LauchNapcatAdapterHandler(BaseEventHandler):
asyncio.create_task(message_process())
asyncio.create_task(check_timeout_response())
class StopNapcatAdapterHandler(BaseEventHandler):
"""关闭Adapter"""
@@ -257,7 +260,7 @@ class StopNapcatAdapterHandler(BaseEventHandler):
async def execute(self, kwargs):
await graceful_shutdown()
return
@register_plugin
class NapcatAdapterPlugin(BasePlugin):
@@ -295,7 +298,7 @@ class NapcatAdapterPlugin(BasePlugin):
def get_plugin_components(self):
self.register_events()
components = []
components.append((LauchNapcatAdapterHandler.get_handler_info(), LauchNapcatAdapterHandler))
components.append((StopNapcatAdapterHandler.get_handler_info(), StopNapcatAdapterHandler))

View File

@@ -58,14 +58,16 @@ class VoiceConfig(ConfigBase):
use_tts: bool = False
"""是否启用TTS功能"""
@dataclass
class SlicingConfig(ConfigBase):
max_frame_size: int = 64
"""WebSocket帧的最大大小单位为字节默认64KB"""
delay_ms: int = 10
"""切片发送间隔时间,单位为毫秒"""
@dataclass
class DebugConfig(ConfigBase):
level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO"

View File

@@ -3,6 +3,7 @@
用于在 Ada 发送给 MMC 时进行消息切片,利用 WebSocket 协议的自动重组特性
仅在 Ada -> MMC 方向进行切片其他方向MMC -> AdaAda <-> Napcat不切片
"""
import json
import uuid
import asyncio
@@ -15,10 +16,9 @@ from src.common.logger import get_logger
logger = get_logger("napcat_adapter")
class MessageChunker:
"""消息切片器,用于处理大消息的分片发送"""
def __init__(self):
self.max_chunk_size = global_config.slicing.max_frame_size * 1024
@@ -29,19 +29,21 @@ class MessageChunker:
message_str = json.dumps(message, ensure_ascii=False)
else:
message_str = message
return len(message_str.encode('utf-8')) > self.max_chunk_size
return len(message_str.encode("utf-8")) > self.max_chunk_size
except Exception as e:
logger.error(f"检查消息大小时出错: {e}")
return False
def chunk_message(self, message: Union[str, Dict[str, Any]], chunk_id: Optional[str] = None) -> List[Dict[str, Any]]:
def chunk_message(
self, message: Union[str, Dict[str, Any]], chunk_id: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
将消息切片
Args:
message: 要切片的消息(字符串或字典)
chunk_id: 切片组ID如果不提供则自动生成
Returns:
切片后的消息字典列表
"""
@@ -51,30 +53,30 @@ class MessageChunker:
message_str = json.dumps(message, ensure_ascii=False)
else:
message_str = message
if not self.should_chunk_message(message_str):
# 不需要切片的情况,如果输入是字典则返回字典,如果是字符串则包装成非切片标记的字典
if isinstance(message, dict):
return [message]
else:
return [{"_original_message": message_str}]
if chunk_id is None:
chunk_id = str(uuid.uuid4())
message_bytes = message_str.encode('utf-8')
message_bytes = message_str.encode("utf-8")
total_size = len(message_bytes)
# 计算需要多少个切片
num_chunks = (total_size + self.max_chunk_size - 1) // self.max_chunk_size
chunks = []
for i in range(num_chunks):
start_pos = i * self.max_chunk_size
end_pos = min(start_pos + self.max_chunk_size, total_size)
chunk_data = message_bytes[start_pos:end_pos]
# 构建切片消息
chunk_message = {
"__mmc_chunk_info__": {
@@ -83,17 +85,17 @@ class MessageChunker:
"total_chunks": num_chunks,
"chunk_size": len(chunk_data),
"total_size": total_size,
"timestamp": time.time()
"timestamp": time.time(),
},
"__mmc_chunk_data__": chunk_data.decode('utf-8', errors='ignore'),
"__mmc_is_chunked__": True
"__mmc_chunk_data__": chunk_data.decode("utf-8", errors="ignore"),
"__mmc_is_chunked__": True,
}
chunks.append(chunk_message)
logger.debug(f"消息切片完成: {total_size} bytes -> {num_chunks} chunks (ID: {chunk_id})")
return chunks
except Exception as e:
logger.error(f"消息切片时出错: {e}")
# 出错时返回原消息
@@ -101,7 +103,7 @@ class MessageChunker:
return [message]
else:
return [{"_original_message": message}]
def is_chunk_message(self, message: Union[str, Dict[str, Any]]) -> bool:
"""判断是否是切片消息"""
try:
@@ -109,12 +111,12 @@ class MessageChunker:
data = json.loads(message)
else:
data = message
return (
isinstance(data, dict) and
"__mmc_chunk_info__" in data and
"__mmc_chunk_data__" in data and
"__mmc_is_chunked__" in data
isinstance(data, dict)
and "__mmc_chunk_info__" in data
and "__mmc_chunk_data__" in data
and "__mmc_is_chunked__" in data
)
except (json.JSONDecodeError, TypeError):
return False
@@ -122,17 +124,17 @@ class MessageChunker:
class MessageReassembler:
"""消息重组器,用于重组接收到的切片消息"""
def __init__(self, timeout: int = 30):
self.timeout = timeout
self.chunk_buffers: Dict[str, Dict[str, Any]] = {}
self._cleanup_task = None
async def start_cleanup_task(self):
"""启动清理任务"""
if self._cleanup_task is None:
self._cleanup_task = asyncio.create_task(self._cleanup_expired_chunks())
async def stop_cleanup_task(self):
"""停止清理任务"""
if self._cleanup_task:
@@ -142,35 +144,35 @@ class MessageReassembler:
except asyncio.CancelledError:
pass
self._cleanup_task = None
async def _cleanup_expired_chunks(self):
"""清理过期的切片缓冲区"""
while True:
try:
await asyncio.sleep(10) # 每10秒检查一次
current_time = time.time()
expired_chunks = []
for chunk_id, buffer_info in self.chunk_buffers.items():
if current_time - buffer_info['timestamp'] > self.timeout:
if current_time - buffer_info["timestamp"] > self.timeout:
expired_chunks.append(chunk_id)
for chunk_id in expired_chunks:
logger.warning(f"清理过期的切片缓冲区: {chunk_id}")
del self.chunk_buffers[chunk_id]
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"清理过期切片时出错: {e}")
async def add_chunk(self, message: Union[str, Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""
添加切片,如果切片完整则返回重组后的消息
Args:
message: 切片消息(字符串或字典)
Returns:
如果切片完整则返回重组后的原始消息字典否则返回None
"""
@@ -180,7 +182,7 @@ class MessageReassembler:
chunk_data = json.loads(message)
else:
chunk_data = message
# 检查是否是切片消息
if not chunker.is_chunk_message(chunk_data):
# 不是切片消息,直接返回
@@ -192,38 +194,38 @@ class MessageReassembler:
return {"text_message": chunk_data["_original_message"]}
else:
return chunk_data
chunk_info = chunk_data["__mmc_chunk_info__"]
chunk_content = chunk_data["__mmc_chunk_data__"]
chunk_id = chunk_info["chunk_id"]
chunk_index = chunk_info["chunk_index"]
total_chunks = chunk_info["total_chunks"]
chunk_timestamp = chunk_info.get("timestamp", time.time())
# 初始化缓冲区
if chunk_id not in self.chunk_buffers:
self.chunk_buffers[chunk_id] = {
"chunks": {},
"total_chunks": total_chunks,
"received_chunks": 0,
"timestamp": chunk_timestamp
"timestamp": chunk_timestamp,
}
buffer = self.chunk_buffers[chunk_id]
# 检查切片是否已经接收过
if chunk_index in buffer["chunks"]:
logger.warning(f"重复接收切片: {chunk_id}#{chunk_index}")
return None
# 添加切片
buffer["chunks"][chunk_index] = chunk_content
buffer["received_chunks"] += 1
buffer["timestamp"] = time.time() # 更新时间戳
logger.debug(f"接收切片: {chunk_id}#{chunk_index} ({buffer['received_chunks']}/{total_chunks})")
# 检查是否接收完整
if buffer["received_chunks"] == total_chunks:
# 重组消息
@@ -233,25 +235,25 @@ class MessageReassembler:
logger.error(f"切片 {chunk_id}#{i} 缺失,无法重组")
return None
reassembled_message += buffer["chunks"][i]
# 清理缓冲区
del self.chunk_buffers[chunk_id]
logger.debug(f"消息重组完成: {chunk_id} ({len(reassembled_message)} chars)")
# 尝试反序列化重组后的消息
try:
return json.loads(reassembled_message)
except json.JSONDecodeError:
# 如果不能反序列化为JSON则作为文本消息返回
return {"text_message": reassembled_message}
return None
except (json.JSONDecodeError, KeyError, TypeError) as e:
logger.error(f"处理切片消息时出错: {e}")
return None
def get_pending_chunks_info(self) -> Dict[str, Any]:
"""获取待处理切片信息"""
info = {}
@@ -260,11 +262,11 @@ class MessageReassembler:
"received": buffer["received_chunks"],
"total": buffer["total_chunks"],
"progress": f"{buffer['received_chunks']}/{buffer['total_chunks']}",
"age_seconds": time.time() - buffer["timestamp"]
"age_seconds": time.time() - buffer["timestamp"],
}
return info
# 全局实例
chunker = MessageChunker()
reassembler = MessageReassembler()
reassembler = MessageReassembler()

View File

@@ -743,31 +743,31 @@ class MessageHandler:
"""
message_data: dict = raw_message.get("data", {})
json_data = message_data.get("data", "")
# 检查JSON消息格式
if not message_data or "data" not in message_data:
logger.warning("JSON消息格式不正确")
return Seg(type="json", data=json.dumps(message_data))
try:
nested_data = json.loads(json_data)
# 检查是否是QQ小程序分享消息
if "app" in nested_data and "com.tencent.miniapp" in str(nested_data.get("app", "")):
logger.debug("检测到QQ小程序分享消息开始提取信息")
# 提取目标字段
extracted_info = {}
# 提取 meta.detail_1 中的信息
meta = nested_data.get("meta", {})
detail_1 = meta.get("detail_1", {})
if detail_1:
extracted_info["title"] = detail_1.get("title", "")
extracted_info["desc"] = detail_1.get("desc", "")
qqdocurl = detail_1.get("qqdocurl", "")
# 从qqdocurl中提取b23.tv短链接
if qqdocurl and "b23.tv" in qqdocurl:
# 查找b23.tv链接的起始位置
@@ -785,26 +785,29 @@ class MessageHandler:
extracted_info["short_url"] = qqdocurl
else:
extracted_info["short_url"] = qqdocurl
# 如果成功提取到关键信息,返回格式化的文本
if extracted_info.get("title") or extracted_info.get("desc") or extracted_info.get("short_url"):
content_parts = []
if extracted_info.get("title"):
content_parts.append(f"来源: {extracted_info['title']}")
if extracted_info.get("desc"):
content_parts.append(f"标题: {extracted_info['desc']}")
if extracted_info.get("short_url"):
content_parts.append(f"链接: {extracted_info['short_url']}")
formatted_content = "\n".join(content_parts)
return Seg(type="text", data=f"这是一条小程序分享消息,可以根据来源,考虑使用对应解析工具\n{formatted_content}")
return Seg(
type="text",
data=f"这是一条小程序分享消息,可以根据来源,考虑使用对应解析工具\n{formatted_content}",
)
# 如果没有提取到关键信息返回None
return None
except json.JSONDecodeError as e:
logger.error(f"解析JSON消息失败: {e}")
return None

View File

@@ -28,36 +28,36 @@ class MessageSending:
try:
# 检查是否需要切片发送
message_dict = message_base.to_dict()
if chunker.should_chunk_message(message_dict):
logger.info(f"消息过大,进行切片发送到 MaiBot")
# 切片消息
chunks = chunker.chunk_message(message_dict)
# 逐个发送切片
for i, chunk in enumerate(chunks):
logger.debug(f"发送切片 {i+1}/{len(chunks)} 到 MaiBot")
logger.debug(f"发送切片 {i + 1}/{len(chunks)} 到 MaiBot")
# 获取对应的客户端并发送切片
platform = message_base.message_info.platform
if platform not in self.maibot_router.clients:
logger.error(f"平台 {platform} 未连接")
return False
client = self.maibot_router.clients[platform]
send_status = await client.send_message(chunk)
if not send_status:
logger.error(f"发送切片 {i+1}/{len(chunks)} 失败")
logger.error(f"发送切片 {i + 1}/{len(chunks)} 失败")
return False
# 使用配置中的延迟时间
if i < len(chunks) - 1:
delay_seconds = global_config.slicing.delay_ms / 1000.0
logger.debug(f"切片发送延迟: {global_config.slicing.delay_ms}毫秒")
await asyncio.sleep(delay_seconds)
logger.debug("所有切片发送完成")
return True
else:
@@ -66,7 +66,7 @@ class MessageSending:
if not send_status:
raise RuntimeError("可能是路由未正确配置或连接异常")
return send_status
except Exception as e:
logger.error(f"发送消息失败: {str(e)}")
logger.error("请检查与MaiBot之间的连接")