fix: 小修
This commit is contained in:
@@ -1,160 +0,0 @@
|
||||
import base64
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
"""
|
||||
OneBot v11 Message Segment Builder
|
||||
|
||||
This module provides classes for building message segments that conform to the
|
||||
OneBot v11 standard. These segments can be used to construct complex messages
|
||||
for sending through bots that implement the OneBot interface.
|
||||
"""
|
||||
|
||||
|
||||
class Segment:
|
||||
"""Base class for all message segments."""
|
||||
|
||||
def __init__(self, type_: str, data: Dict[str, Any]):
|
||||
self.type = type_
|
||||
self.data = data
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert the segment to a dictionary format."""
|
||||
return {"type": self.type, "data": self.data}
|
||||
|
||||
|
||||
class Text(Segment):
|
||||
"""Text message segment."""
|
||||
|
||||
def __init__(self, text: str):
|
||||
super().__init__("text", {"text": text})
|
||||
|
||||
|
||||
class Face(Segment):
|
||||
"""Face/emoji message segment."""
|
||||
|
||||
def __init__(self, face_id: int):
|
||||
super().__init__("face", {"id": str(face_id)})
|
||||
|
||||
|
||||
class Image(Segment):
|
||||
"""Image message segment."""
|
||||
|
||||
@classmethod
|
||||
def from_url(cls, url: str) -> "Image":
|
||||
"""Create an Image segment from a URL."""
|
||||
return cls(url=url)
|
||||
|
||||
@classmethod
|
||||
def from_path(cls, path: str) -> "Image":
|
||||
"""Create an Image segment from a file path."""
|
||||
with open(path, "rb") as f:
|
||||
file_b64 = base64.b64encode(f.read()).decode("utf-8")
|
||||
return cls(file=f"base64://{file_b64}")
|
||||
|
||||
def __init__(self, file: str = None, url: str = None, cache: bool = True):
|
||||
data = {}
|
||||
if file:
|
||||
data["file"] = file
|
||||
if url:
|
||||
data["url"] = url
|
||||
if not cache:
|
||||
data["cache"] = "0"
|
||||
super().__init__("image", data)
|
||||
|
||||
|
||||
class At(Segment):
|
||||
"""@Someone message segment."""
|
||||
|
||||
def __init__(self, user_id: Union[int, str]):
|
||||
data = {"qq": str(user_id)}
|
||||
super().__init__("at", data)
|
||||
|
||||
|
||||
class Record(Segment):
|
||||
"""Voice message segment."""
|
||||
|
||||
def __init__(self, file: str, magic: bool = False, cache: bool = True):
|
||||
data = {"file": file}
|
||||
if magic:
|
||||
data["magic"] = "1"
|
||||
if not cache:
|
||||
data["cache"] = "0"
|
||||
super().__init__("record", data)
|
||||
|
||||
|
||||
class Video(Segment):
|
||||
"""Video message segment."""
|
||||
|
||||
def __init__(self, file: str):
|
||||
super().__init__("video", {"file": file})
|
||||
|
||||
|
||||
class Reply(Segment):
|
||||
"""Reply message segment."""
|
||||
|
||||
def __init__(self, message_id: int):
|
||||
super().__init__("reply", {"id": str(message_id)})
|
||||
|
||||
|
||||
class MessageBuilder:
|
||||
"""Helper class for building complex messages."""
|
||||
|
||||
def __init__(self):
|
||||
self.segments: List[Segment] = []
|
||||
|
||||
def text(self, text: str) -> "MessageBuilder":
|
||||
"""Add a text segment."""
|
||||
self.segments.append(Text(text))
|
||||
return self
|
||||
|
||||
def face(self, face_id: int) -> "MessageBuilder":
|
||||
"""Add a face/emoji segment."""
|
||||
self.segments.append(Face(face_id))
|
||||
return self
|
||||
|
||||
def image(self, file: str = None) -> "MessageBuilder":
|
||||
"""Add an image segment."""
|
||||
self.segments.append(Image(file=file))
|
||||
return self
|
||||
|
||||
def at(self, user_id: Union[int, str]) -> "MessageBuilder":
|
||||
"""Add an @someone segment."""
|
||||
self.segments.append(At(user_id))
|
||||
return self
|
||||
|
||||
def record(self, file: str, magic: bool = False) -> "MessageBuilder":
|
||||
"""Add a voice record segment."""
|
||||
self.segments.append(Record(file, magic))
|
||||
return self
|
||||
|
||||
def video(self, file: str) -> "MessageBuilder":
|
||||
"""Add a video segment."""
|
||||
self.segments.append(Video(file))
|
||||
return self
|
||||
|
||||
def reply(self, message_id: int) -> "MessageBuilder":
|
||||
"""Add a reply segment."""
|
||||
self.segments.append(Reply(message_id))
|
||||
return self
|
||||
|
||||
def build(self) -> List[Dict[str, Any]]:
|
||||
"""Build the message into a list of segment dictionaries."""
|
||||
return [segment.to_dict() for segment in self.segments]
|
||||
|
||||
|
||||
'''Convenience functions
|
||||
def text(content: str) -> Dict[str, Any]:
|
||||
"""Create a text message segment."""
|
||||
return Text(content).to_dict()
|
||||
|
||||
def image_url(url: str) -> Dict[str, Any]:
|
||||
"""Create an image message segment from URL."""
|
||||
return Image.from_url(url).to_dict()
|
||||
|
||||
def image_path(path: str) -> Dict[str, Any]:
|
||||
"""Create an image message segment from file path."""
|
||||
return Image.from_path(path).to_dict()
|
||||
|
||||
def at(user_id: Union[int, str]) -> Dict[str, Any]:
|
||||
"""Create an @someone message segment."""
|
||||
return At(user_id).to_dict()'''
|
||||
@@ -17,7 +17,6 @@ from .relationship_manager import relationship_manager
|
||||
from .storage import MessageStorage
|
||||
from .utils import is_mentioned_bot_in_message
|
||||
from .utils_image import image_path_to_base64
|
||||
from .utils_user import get_user_nickname, get_user_cardname
|
||||
from ..willing.willing_manager import willing_manager # 导入意愿管理器
|
||||
from ..message import UserInfo, GroupInfo, Seg
|
||||
|
||||
|
||||
@@ -1,63 +0,0 @@
|
||||
def parse_cq_code(cq_code: str) -> dict:
|
||||
"""
|
||||
将CQ码解析为字典对象
|
||||
|
||||
Args:
|
||||
cq_code (str): CQ码字符串,如 [CQ:image,file=xxx.jpg,url=http://xxx]
|
||||
|
||||
Returns:
|
||||
dict: 包含type和参数的字典,如 {'type': 'image', 'data': {'file': 'xxx.jpg', 'url': 'http://xxx'}}
|
||||
"""
|
||||
# 检查是否是有效的CQ码
|
||||
if not (cq_code.startswith("[CQ:") and cq_code.endswith("]")):
|
||||
return {"type": "text", "data": {"text": cq_code}}
|
||||
|
||||
# 移除前后的 [CQ: 和 ]
|
||||
content = cq_code[4:-1]
|
||||
|
||||
# 分离类型和参数
|
||||
parts = content.split(",")
|
||||
if len(parts) < 1:
|
||||
return {"type": "text", "data": {"text": cq_code}}
|
||||
|
||||
cq_type = parts[0]
|
||||
params = {}
|
||||
|
||||
# 处理参数部分
|
||||
if len(parts) > 1:
|
||||
# 遍历所有参数
|
||||
for part in parts[1:]:
|
||||
if "=" in part:
|
||||
key, value = part.split("=", 1)
|
||||
params[key.strip()] = value.strip()
|
||||
|
||||
return {"type": cq_type, "data": params}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 测试用例列表
|
||||
test_cases = [
|
||||
# 测试图片CQ码
|
||||
"[CQ:image,summary=,file={6E392FD2-AAA1-5192-F52A-F724A8EC7998}.gif,sub_type=1,url=https://gchat.qpic.cn/gchatpic_new/0/0-0-6E392FD2AAA15192F52AF724A8EC7998/0,file_size=861609]",
|
||||
# 测试at CQ码
|
||||
"[CQ:at,qq=123456]",
|
||||
# 测试普通文本
|
||||
"Hello World",
|
||||
# 测试face表情CQ码
|
||||
"[CQ:face,id=123]",
|
||||
# 测试含有多个逗号的URL
|
||||
"[CQ:image,url=https://example.com/image,with,commas.jpg]",
|
||||
# 测试空参数
|
||||
"[CQ:image,summary=]",
|
||||
# 测试非法CQ码
|
||||
"[CQ:]",
|
||||
"[CQ:invalid",
|
||||
]
|
||||
|
||||
# 测试每个用例
|
||||
for i, test_case in enumerate(test_cases, 1):
|
||||
print(f"\n测试用例 {i}:")
|
||||
print(f"输入: {test_case}")
|
||||
result = parse_cq_code(test_case)
|
||||
print(f"输出: {result}")
|
||||
print("-" * 50)
|
||||
@@ -1,20 +0,0 @@
|
||||
from .config import global_config
|
||||
from .relationship_manager import relationship_manager
|
||||
|
||||
|
||||
def get_user_nickname(user_id: int) -> str:
|
||||
if int(user_id) == int(global_config.BOT_QQ):
|
||||
return global_config.BOT_NICKNAME
|
||||
# print(user_id)
|
||||
return relationship_manager.get_name(int(user_id))
|
||||
|
||||
|
||||
def get_user_cardname(user_id: int) -> str:
|
||||
if int(user_id) == int(global_config.BOT_QQ):
|
||||
return global_config.BOT_NICKNAME
|
||||
# print(user_id)
|
||||
return ""
|
||||
|
||||
|
||||
def get_groupname(group_id: int) -> str:
|
||||
return f"群{group_id}"
|
||||
Reference in New Issue
Block a user