@@ -14,9 +14,23 @@ from .utils_image import storage_image,storage_emoji
|
||||
from .utils_user import get_user_nickname
|
||||
#解析各种CQ码
|
||||
#包含CQ码类
|
||||
import urllib3
|
||||
from urllib3.util import create_urllib3_context
|
||||
|
||||
# TLS1.3特殊处理 https://github.com/psf/requests/issues/6616
|
||||
ctx = create_urllib3_context()
|
||||
ctx.load_default_certs()
|
||||
ctx.set_ciphers("AES128-GCM-SHA256")
|
||||
|
||||
class TencentSSLAdapter(requests.adapters.HTTPAdapter):
|
||||
def __init__(self, ssl_context=None, **kwargs):
|
||||
self.ssl_context = ssl_context
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def init_poolmanager(self, connections, maxsize, block=False):
|
||||
self.poolmanager = urllib3.poolmanager.PoolManager(
|
||||
num_pools=connections, maxsize=maxsize,
|
||||
block=block, ssl_context=self.ssl_context)
|
||||
|
||||
@dataclass
|
||||
class CQCode:
|
||||
@@ -42,7 +56,6 @@ class CQCode:
|
||||
|
||||
def translate(self):
|
||||
"""根据CQ码类型进行相应的翻译处理"""
|
||||
print(f"\033[1;34m[调试信息]\033[0m 开始翻译CQ{self.params}码: {self.type}")
|
||||
if self.type == 'text':
|
||||
self.translated_plain_text = self.params.get('text', '')
|
||||
elif self.type == 'image':
|
||||
@@ -78,7 +91,7 @@ class CQCode:
|
||||
'Pragma': 'no-cache'
|
||||
}
|
||||
'''
|
||||
|
||||
# 腾讯专用请求头配置
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.87 Safari/537.36',
|
||||
'Accept': 'text/html, application/xhtml xml, */*',
|
||||
@@ -89,38 +102,51 @@ class CQCode:
|
||||
}
|
||||
url = html.unescape(self.params['url'])
|
||||
if not url.startswith(('http://', 'https://')):
|
||||
return None # 直接返回而不是抛出异常
|
||||
|
||||
return None
|
||||
|
||||
# 创建专用会话
|
||||
session = requests.session()
|
||||
session.adapters.pop("https://", None)
|
||||
session.mount("https://", TencentSSLAdapter(ctx))
|
||||
|
||||
max_retries = 3
|
||||
for retry in range(max_retries):
|
||||
try:
|
||||
response = requests.get(url, headers=headers, timeout=10, verify=False)
|
||||
if response.status_code == 200:
|
||||
break
|
||||
elif response.status_code == 400 and 'multimedia.nt.qq.com.cn' in url:
|
||||
# 对于腾讯多媒体服务器的链接,直接返回图片描述
|
||||
response = session.get(
|
||||
url,
|
||||
headers=headers,
|
||||
timeout=15,
|
||||
allow_redirects=True,
|
||||
stream=True # 流式传输避免大内存问题
|
||||
)
|
||||
|
||||
# 腾讯服务器特殊状态码处理
|
||||
if response.status_code == 400 and 'multimedia.nt.qq.com.cn' in url:
|
||||
return None
|
||||
time.sleep(1) # 重试前等待1秒
|
||||
except requests.RequestException:
|
||||
|
||||
if response.status_code != 200:
|
||||
raise requests.exceptions.HTTPError(f"HTTP {response.status_code}")
|
||||
|
||||
# 验证内容类型
|
||||
content_type = response.headers.get('Content-Type', '')
|
||||
if not content_type.startswith('image/'):
|
||||
raise ValueError(f"非图片内容类型: {content_type}")
|
||||
|
||||
# 转换为Base64
|
||||
image_base64 = base64.b64encode(response.content).decode('utf-8')
|
||||
self.image_base64 = image_base64
|
||||
return image_base64
|
||||
|
||||
except (requests.exceptions.SSLError, requests.exceptions.HTTPError) as e:
|
||||
if retry == max_retries - 1:
|
||||
raise
|
||||
time.sleep(1)
|
||||
if response.status_code != 200:
|
||||
print(f"\033[1;31m[警告]\033[0m 图片下载失败: HTTP {response.status_code}, URL: {url}")
|
||||
return None # 直接返回而不是抛出异常
|
||||
#检查是否为图片
|
||||
content_type = response.headers.get('content-type', '')
|
||||
if not content_type.startswith('image/'):
|
||||
print(f"\033[1;31m[警告]\033[0m 非图片类型响应: {content_type}")
|
||||
return None # 直接返回而不是抛出异常
|
||||
content = response.content
|
||||
image_base64 = base64.b64encode(content).decode('utf-8')
|
||||
if image_base64:
|
||||
self.image_base64 = image_base64
|
||||
|
||||
return image_base64
|
||||
else:
|
||||
return None
|
||||
print(f"\033[1;31m[致命错误]\033[0m 最终请求失败: {str(e)}")
|
||||
time.sleep(1.5 ** retry) # 指数退避
|
||||
|
||||
except Exception as e:
|
||||
print(f"\033[1;33m[未知错误]\033[0m {str(e)}")
|
||||
return None
|
||||
|
||||
return None
|
||||
|
||||
def translate_emoji(self) -> str:
|
||||
"""处理表情包类型的CQ码"""
|
||||
|
||||
Reference in New Issue
Block a user