feat: 重构完成开始测试debug
This commit is contained in:
@@ -3,23 +3,22 @@ import html
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Optional, List, Union
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
import requests
|
||||
|
||||
# 解析各种CQ码
|
||||
# 包含CQ码类
|
||||
import urllib3
|
||||
from loguru import logger
|
||||
from nonebot import get_driver
|
||||
from urllib3.util import create_urllib3_context
|
||||
from loguru import logger
|
||||
|
||||
from ..models.utils_model import LLM_request
|
||||
from .config import global_config
|
||||
from .mapper import emojimapper
|
||||
from .utils_image import image_manager
|
||||
from .utils_user import get_user_nickname
|
||||
from .message_base import Seg
|
||||
from .utils_user import get_user_nickname
|
||||
|
||||
driver = get_driver()
|
||||
config = driver.config
|
||||
@@ -37,21 +36,25 @@ class TencentSSLAdapter(requests.adapters.HTTPAdapter):
|
||||
|
||||
def init_poolmanager(self, connections, maxsize, block=False):
|
||||
self.poolmanager = urllib3.poolmanager.PoolManager(
|
||||
num_pools=connections, maxsize=maxsize,
|
||||
block=block, ssl_context=self.ssl_context)
|
||||
num_pools=connections,
|
||||
maxsize=maxsize,
|
||||
block=block,
|
||||
ssl_context=self.ssl_context,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CQCode:
|
||||
"""
|
||||
CQ码数据类,用于存储和处理CQ码
|
||||
|
||||
|
||||
属性:
|
||||
type: CQ码类型(如'image', 'at', 'face'等)
|
||||
params: CQ码的参数字典
|
||||
raw_code: 原始CQ码字符串
|
||||
translated_segments: 经过处理后的Seg对象列表
|
||||
"""
|
||||
|
||||
type: str
|
||||
params: Dict[str, str]
|
||||
group_id: int
|
||||
@@ -65,77 +68,52 @@ class CQCode:
|
||||
|
||||
def __post_init__(self):
|
||||
"""初始化LLM实例"""
|
||||
self._llm = LLM_request(model=global_config.vlm, temperature=0.4, max_tokens=300)
|
||||
self._llm = LLM_request(
|
||||
model=global_config.vlm, temperature=0.4, max_tokens=300
|
||||
)
|
||||
|
||||
def translate(self):
|
||||
"""根据CQ码类型进行相应的翻译处理,转换为Seg对象"""
|
||||
if self.type == 'text':
|
||||
if self.type == "text":
|
||||
self.translated_segments = Seg(
|
||||
type='text',
|
||||
data=self.params.get('text', '')
|
||||
type="text", data=self.params.get("text", "")
|
||||
)
|
||||
elif self.type == 'image':
|
||||
elif self.type == "image":
|
||||
base64_data = self.translate_image()
|
||||
if base64_data:
|
||||
if self.params.get('sub_type') == '0':
|
||||
self.translated_segments = Seg(
|
||||
type='image',
|
||||
data=base64_data
|
||||
)
|
||||
if self.params.get("sub_type") == "0":
|
||||
self.translated_segments = Seg(type="image", data=base64_data)
|
||||
else:
|
||||
self.translated_segments = Seg(
|
||||
type='emoji',
|
||||
data=base64_data
|
||||
)
|
||||
self.translated_segments = Seg(type="emoji", data=base64_data)
|
||||
else:
|
||||
self.translated_segments = Seg(
|
||||
type='text',
|
||||
data='[图片]'
|
||||
)
|
||||
elif self.type == 'at':
|
||||
user_nickname = get_user_nickname(self.params.get('qq', ''))
|
||||
self.translated_segments = Seg(type="text", data="[图片]")
|
||||
elif self.type == "at":
|
||||
user_nickname = get_user_nickname(self.params.get("qq", ""))
|
||||
self.translated_segments = Seg(
|
||||
type='text',
|
||||
data=f"[@{user_nickname or '某人'}]"
|
||||
type="text", data=f"[@{user_nickname or '某人'}]"
|
||||
)
|
||||
elif self.type == 'reply':
|
||||
elif self.type == "reply":
|
||||
reply_segments = self.translate_reply()
|
||||
if reply_segments:
|
||||
self.translated_segments = Seg(
|
||||
type='seglist',
|
||||
data=reply_segments
|
||||
)
|
||||
self.translated_segments = Seg(type="seglist", data=reply_segments)
|
||||
else:
|
||||
self.translated_segments = Seg(
|
||||
type='text',
|
||||
data='[回复某人消息]'
|
||||
)
|
||||
elif self.type == 'face':
|
||||
face_id = self.params.get('id', '')
|
||||
self.translated_segments = Seg(type="text", data="[回复某人消息]")
|
||||
elif self.type == "face":
|
||||
face_id = self.params.get("id", "")
|
||||
self.translated_segments = Seg(
|
||||
type='text',
|
||||
data=f"[{emojimapper.get(int(face_id), '表情')}]"
|
||||
type="text", data=f"[{emojimapper.get(int(face_id), '表情')}]"
|
||||
)
|
||||
elif self.type == 'forward':
|
||||
elif self.type == "forward":
|
||||
forward_segments = self.translate_forward()
|
||||
if forward_segments:
|
||||
self.translated_segments = Seg(
|
||||
type='seglist',
|
||||
data=forward_segments
|
||||
)
|
||||
self.translated_segments = Seg(type="seglist", data=forward_segments)
|
||||
else:
|
||||
self.translated_segments = Seg(
|
||||
type='text',
|
||||
data='[转发消息]'
|
||||
)
|
||||
self.translated_segments = Seg(type="text", data="[转发消息]")
|
||||
else:
|
||||
self.translated_segments = Seg(
|
||||
type='text',
|
||||
data=f"[{self.type}]"
|
||||
)
|
||||
self.translated_segments = Seg(type="text", data=f"[{self.type}]")
|
||||
|
||||
def get_img(self):
|
||||
'''
|
||||
"""
|
||||
headers = {
|
||||
'User-Agent': 'QQ/8.9.68.11565 CFNetwork/1220.1 Darwin/20.3.0',
|
||||
'Accept': 'image/*;q=0.8',
|
||||
@@ -144,18 +122,18 @@ class CQCode:
|
||||
'Cache-Control': 'no-cache',
|
||||
'Pragma': 'no-cache'
|
||||
}
|
||||
'''
|
||||
"""
|
||||
# 腾讯专用请求头配置
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.87 Safari/537.36',
|
||||
'Accept': 'text/html, application/xhtml xml, */*',
|
||||
'Accept-Encoding': 'gbk, GB2312',
|
||||
'Accept-Language': 'zh-cn',
|
||||
'Content-Type': 'application/x-www-form-urlencoded',
|
||||
'Cache-Control': 'no-cache'
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.87 Safari/537.36",
|
||||
"Accept": "text/html, application/xhtml xml, */*",
|
||||
"Accept-Encoding": "gbk, GB2312",
|
||||
"Accept-Language": "zh-cn",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"Cache-Control": "no-cache",
|
||||
}
|
||||
url = html.unescape(self.params['url'])
|
||||
if not url.startswith(('http://', 'https://')):
|
||||
url = html.unescape(self.params["url"])
|
||||
if not url.startswith(("http://", "https://")):
|
||||
return None
|
||||
|
||||
# 创建专用会话
|
||||
@@ -171,30 +149,30 @@ class CQCode:
|
||||
headers=headers,
|
||||
timeout=15,
|
||||
allow_redirects=True,
|
||||
stream=True # 流式传输避免大内存问题
|
||||
stream=True, # 流式传输避免大内存问题
|
||||
)
|
||||
|
||||
# 腾讯服务器特殊状态码处理
|
||||
if response.status_code == 400 and 'multimedia.nt.qq.com.cn' in url:
|
||||
if response.status_code == 400 and "multimedia.nt.qq.com.cn" in url:
|
||||
return None
|
||||
|
||||
if response.status_code != 200:
|
||||
raise requests.exceptions.HTTPError(f"HTTP {response.status_code}")
|
||||
|
||||
# 验证内容类型
|
||||
content_type = response.headers.get('Content-Type', '')
|
||||
if not content_type.startswith('image/'):
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
if not content_type.startswith("image/"):
|
||||
raise ValueError(f"非图片内容类型: {content_type}")
|
||||
|
||||
# 转换为Base64
|
||||
image_base64 = base64.b64encode(response.content).decode('utf-8')
|
||||
image_base64 = base64.b64encode(response.content).decode("utf-8")
|
||||
self.image_base64 = image_base64
|
||||
return image_base64
|
||||
|
||||
except (requests.exceptions.SSLError, requests.exceptions.HTTPError) as e:
|
||||
if retry == max_retries - 1:
|
||||
print(f"\033[1;31m[致命错误]\033[0m 最终请求失败: {str(e)}")
|
||||
time.sleep(1.5 ** retry) # 指数退避
|
||||
time.sleep(1.5**retry) # 指数退避
|
||||
|
||||
except Exception as e:
|
||||
print(f"\033[1;33m[未知错误]\033[0m {str(e)}")
|
||||
@@ -202,21 +180,21 @@ class CQCode:
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def translate_image(self) -> Optional[str]:
|
||||
"""处理图片类型的CQ码,返回base64字符串"""
|
||||
if 'url' not in self.params:
|
||||
if "url" not in self.params:
|
||||
return None
|
||||
return self.get_img()
|
||||
|
||||
def translate_forward(self) -> Optional[List[Seg]]:
|
||||
"""处理转发消息,返回Seg列表"""
|
||||
try:
|
||||
if 'content' not in self.params:
|
||||
if "content" not in self.params:
|
||||
return None
|
||||
|
||||
content = self.unescape(self.params['content'])
|
||||
content = self.unescape(self.params["content"])
|
||||
import ast
|
||||
|
||||
try:
|
||||
messages = ast.literal_eval(content)
|
||||
except ValueError as e:
|
||||
@@ -225,46 +203,52 @@ class CQCode:
|
||||
|
||||
formatted_segments = []
|
||||
for msg in messages:
|
||||
sender = msg.get('sender', {})
|
||||
nickname = sender.get('card') or sender.get('nickname', '未知用户')
|
||||
raw_message = msg.get('raw_message', '')
|
||||
message_array = msg.get('message', [])
|
||||
sender = msg.get("sender", {})
|
||||
nickname = sender.get("card") or sender.get("nickname", "未知用户")
|
||||
raw_message = msg.get("raw_message", "")
|
||||
message_array = msg.get("message", [])
|
||||
|
||||
if message_array and isinstance(message_array, list):
|
||||
for message_part in message_array:
|
||||
if message_part.get('type') == 'forward':
|
||||
content_seg = Seg(type='text', data='[转发消息]')
|
||||
if message_part.get("type") == "forward":
|
||||
content_seg = Seg(type="text", data="[转发消息]")
|
||||
break
|
||||
else:
|
||||
if raw_message:
|
||||
from .message_cq import MessageRecvCQ
|
||||
|
||||
message_obj = MessageRecvCQ(
|
||||
user_id=msg.get('user_id', 0),
|
||||
message_id=msg.get('message_id', 0),
|
||||
user_id=msg.get("user_id", 0),
|
||||
message_id=msg.get("message_id", 0),
|
||||
raw_message=raw_message,
|
||||
plain_text=raw_message,
|
||||
group_id=msg.get('group_id', 0)
|
||||
group_id=msg.get("group_id", 0),
|
||||
)
|
||||
content_seg = Seg(
|
||||
type="seglist", data=message_obj.message_segments
|
||||
)
|
||||
content_seg = Seg(type='seglist', data=message_obj.message_segments)
|
||||
else:
|
||||
content_seg = Seg(type='text', data='[空消息]')
|
||||
content_seg = Seg(type="text", data="[空消息]")
|
||||
else:
|
||||
if raw_message:
|
||||
from .message_cq import MessageRecvCQ
|
||||
|
||||
message_obj = MessageRecvCQ(
|
||||
user_id=msg.get('user_id', 0),
|
||||
message_id=msg.get('message_id', 0),
|
||||
user_id=msg.get("user_id", 0),
|
||||
message_id=msg.get("message_id", 0),
|
||||
raw_message=raw_message,
|
||||
plain_text=raw_message,
|
||||
group_id=msg.get('group_id', 0)
|
||||
group_id=msg.get("group_id", 0),
|
||||
)
|
||||
content_seg = Seg(
|
||||
type="seglist", data=message_obj.message_segments
|
||||
)
|
||||
content_seg = Seg(type='seglist', data=message_obj.message_segments)
|
||||
else:
|
||||
content_seg = Seg(type='text', data='[空消息]')
|
||||
content_seg = Seg(type="text", data="[空消息]")
|
||||
|
||||
formatted_segments.append(Seg(type='text', data=f"{nickname}: "))
|
||||
formatted_segments.append(Seg(type="text", data=f"{nickname}: "))
|
||||
formatted_segments.append(content_seg)
|
||||
formatted_segments.append(Seg(type='text', data='\n'))
|
||||
formatted_segments.append(Seg(type="text", data="\n"))
|
||||
|
||||
return formatted_segments
|
||||
|
||||
@@ -275,6 +259,7 @@ class CQCode:
|
||||
def translate_reply(self) -> Optional[List[Seg]]:
|
||||
"""处理回复类型的CQ码,返回Seg列表"""
|
||||
from .message_cq import MessageRecvCQ
|
||||
|
||||
if self.reply_message is None:
|
||||
return None
|
||||
|
||||
@@ -283,17 +268,26 @@ class CQCode:
|
||||
user_id=self.reply_message.sender.user_id,
|
||||
message_id=self.reply_message.message_id,
|
||||
raw_message=str(self.reply_message.message),
|
||||
group_id=self.group_id
|
||||
group_id=self.group_id,
|
||||
)
|
||||
|
||||
|
||||
segments = []
|
||||
if message_obj.user_id == global_config.BOT_QQ:
|
||||
segments.append(Seg(type='text', data=f"[回复 {global_config.BOT_NICKNAME} 的消息: "))
|
||||
segments.append(
|
||||
Seg(
|
||||
type="text", data=f"[回复 {global_config.BOT_NICKNAME} 的消息: "
|
||||
)
|
||||
)
|
||||
else:
|
||||
segments.append(Seg(type='text', data=f"[回复 {self.reply_message.sender.nickname} 的消息: "))
|
||||
|
||||
segments.append(Seg(type='seglist', data=message_obj.message_segments))
|
||||
segments.append(Seg(type='text', data="]"))
|
||||
segments.append(
|
||||
Seg(
|
||||
type="text",
|
||||
data=f"[回复 {self.reply_message.sender.nickname} 的消息: ",
|
||||
)
|
||||
)
|
||||
|
||||
segments.append(Seg(type="seglist", data=message_obj.message_segments))
|
||||
segments.append(Seg(type="text", data="]"))
|
||||
return segments
|
||||
else:
|
||||
return None
|
||||
@@ -301,12 +295,12 @@ class CQCode:
|
||||
@staticmethod
|
||||
def unescape(text: str) -> str:
|
||||
"""反转义CQ码中的特殊字符"""
|
||||
return text.replace(',', ',') \
|
||||
.replace('[', '[') \
|
||||
.replace(']', ']') \
|
||||
.replace('&', '&')
|
||||
|
||||
|
||||
return (
|
||||
text.replace(",", ",")
|
||||
.replace("[", "[")
|
||||
.replace("]", "]")
|
||||
.replace("&", "&")
|
||||
)
|
||||
|
||||
|
||||
class CQCode_tool:
|
||||
@@ -314,29 +308,25 @@ class CQCode_tool:
|
||||
def cq_from_dict_to_class(cq_code: Dict, reply: Optional[Dict] = None) -> CQCode:
|
||||
"""
|
||||
将CQ码字典转换为CQCode对象
|
||||
|
||||
|
||||
Args:
|
||||
cq_code: CQ码字典
|
||||
reply: 回复消息的字典(可选)
|
||||
|
||||
|
||||
Returns:
|
||||
CQCode对象
|
||||
"""
|
||||
# 处理字典形式的CQ码
|
||||
# 从cq_code字典中获取type字段的值,如果不存在则默认为'text'
|
||||
cq_type = cq_code.get('type', 'text')
|
||||
cq_type = cq_code.get("type", "text")
|
||||
params = {}
|
||||
if cq_type == 'text':
|
||||
params['text'] = cq_code.get('data', {}).get('text', '')
|
||||
if cq_type == "text":
|
||||
params["text"] = cq_code.get("data", {}).get("text", "")
|
||||
else:
|
||||
params = cq_code.get('data', {})
|
||||
params = cq_code.get("data", {})
|
||||
|
||||
instance = CQCode(
|
||||
type=cq_type,
|
||||
params=params,
|
||||
group_id=0,
|
||||
user_id=0,
|
||||
reply_message=reply
|
||||
type=cq_type, params=params, group_id=0, user_id=0, reply_message=reply
|
||||
)
|
||||
|
||||
# 进行翻译处理
|
||||
@@ -353,7 +343,7 @@ class CQCode_tool:
|
||||
回复CQ码字符串
|
||||
"""
|
||||
return f"[CQ:reply,id={message_id}]"
|
||||
|
||||
|
||||
@staticmethod
|
||||
def create_emoji_cq(file_path: str) -> str:
|
||||
"""
|
||||
@@ -366,13 +356,15 @@ class CQCode_tool:
|
||||
# 确保使用绝对路径
|
||||
abs_path = os.path.abspath(file_path)
|
||||
# 转义特殊字符
|
||||
escaped_path = abs_path.replace('&', '&') \
|
||||
.replace('[', '[') \
|
||||
.replace(']', ']') \
|
||||
.replace(',', ',')
|
||||
escaped_path = (
|
||||
abs_path.replace("&", "&")
|
||||
.replace("[", "[")
|
||||
.replace("]", "]")
|
||||
.replace(",", ",")
|
||||
)
|
||||
# 生成CQ码,设置sub_type=1表示这是表情包
|
||||
return f"[CQ:image,file=file:///{escaped_path},sub_type=1]"
|
||||
|
||||
|
||||
@staticmethod
|
||||
def create_emoji_cq_base64(base64_data: str) -> str:
|
||||
"""
|
||||
@@ -383,15 +375,14 @@ class CQCode_tool:
|
||||
表情包CQ码字符串
|
||||
"""
|
||||
# 转义base64数据
|
||||
escaped_base64 = base64_data.replace('&', '&') \
|
||||
.replace('[', '[') \
|
||||
.replace(']', ']') \
|
||||
.replace(',', ',')
|
||||
escaped_base64 = (
|
||||
base64_data.replace("&", "&")
|
||||
.replace("[", "[")
|
||||
.replace("]", "]")
|
||||
.replace(",", ",")
|
||||
)
|
||||
# 生成CQ码,设置sub_type=1表示这是表情包
|
||||
return f"[CQ:image,file=base64://{escaped_base64},sub_type=1]"
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
cq_code_tool = CQCode_tool()
|
||||
|
||||
Reference in New Issue
Block a user