fix: update CQCode and Message classes for async initialization and processing

This commit is contained in:
KawaiiYusora
2025-03-07 01:31:03 +08:00
parent 26f99664ee
commit e0e3ee4177
5 changed files with 112 additions and 103 deletions

View File

@@ -10,11 +10,11 @@ from nonebot.adapters.onebot.v11 import Bot
from .config import global_config
import time
import asyncio
from .utils_image import storage_image,storage_emoji
from .utils_image import storage_image, storage_emoji
from .utils_user import get_user_nickname
from ..models.utils_model import LLM_request
#解析各种CQ码
#包含CQ码类
# 解析各种CQ码
# 包含CQ码类
import urllib3
from urllib3.util import create_urllib3_context
from nonebot import get_driver
@@ -27,6 +27,7 @@ ctx = create_urllib3_context()
ctx.load_default_certs()
ctx.set_ciphers("AES128-GCM-SHA256")
class TencentSSLAdapter(requests.adapters.HTTPAdapter):
def __init__(self, ssl_context=None, **kwargs):
self.ssl_context = ssl_context
@@ -37,6 +38,7 @@ class TencentSSLAdapter(requests.adapters.HTTPAdapter):
num_pools=connections, maxsize=maxsize,
block=block, ssl_context=self.ssl_context)
@dataclass
class CQCode:
"""
@@ -80,13 +82,13 @@ class CQCode:
else:
self.translated_plain_text = f"@某人"
elif self.type == 'reply':
self.translated_plain_text = self.translate_reply()
self.translated_plain_text = await self.translate_reply()
elif self.type == 'face':
face_id = self.params.get('id', '')
# self.translated_plain_text = f"[表情{face_id}]"
self.translated_plain_text = f"[表情]"
elif self.type == 'forward':
self.translated_plain_text = self.translate_forward()
self.translated_plain_text = await self.translate_forward()
else:
self.translated_plain_text = f"[{self.type}]"
@@ -133,7 +135,7 @@ class CQCode:
# 腾讯服务器特殊状态码处理
if response.status_code == 400 and 'multimedia.nt.qq.com.cn' in url:
return None
if response.status_code != 200:
raise requests.exceptions.HTTPError(f"HTTP {response.status_code}")
@@ -157,7 +159,7 @@ class CQCode:
return None
return None
async def translate_emoji(self) -> str:
"""处理表情包类型的CQ码"""
if 'url' not in self.params:
@@ -170,11 +172,10 @@ class CQCode:
return await self.get_emoji_description(base64_str)
else:
return '[表情包]'
async def translate_image(self) -> str:
"""处理图片类型的CQ码区分普通图片和表情包"""
#没有url直接返回默认文本
# 没有url直接返回默认文本
if 'url' not in self.params:
return '[图片]'
base64_str = self.get_img()
@@ -206,13 +207,13 @@ class CQCode:
except Exception as e:
print(f"\033[1;31m[错误]\033[0m AI接口调用失败: {str(e)}")
return "[图片]"
def translate_forward(self) -> str:
async def translate_forward(self) -> str:
"""处理转发消息"""
try:
if 'content' not in self.params:
return '[转发消息]'
# 解析content内容需要先反转义
content = self.unescape(self.params['content'])
# print(f"\033[1;34m[调试信息]\033[0m 转发消息内容: {content}")
@@ -223,17 +224,17 @@ class CQCode:
except ValueError as e:
print(f"\033[1;31m[错误]\033[0m 解析转发消息内容失败: {str(e)}")
return '[转发消息]'
# 处理每条消息
formatted_messages = []
for msg in messages:
sender = msg.get('sender', {})
nickname = sender.get('card') or sender.get('nickname', '未知用户')
# 获取消息内容并使用Message类处理
raw_message = msg.get('raw_message', '')
message_array = msg.get('message', [])
if message_array and isinstance(message_array, list):
# 检查是否包含嵌套的转发消息
for message_part in message_array:
@@ -251,6 +252,7 @@ class CQCode:
plain_text=raw_message,
group_id=msg.get('group_id', 0)
)
await message_obj.initialize()
content = message_obj.processed_plain_text
else:
content = '[空消息]'
@@ -265,23 +267,24 @@ class CQCode:
plain_text=raw_message,
group_id=msg.get('group_id', 0)
)
await message_obj.initialize()
content = message_obj.processed_plain_text
else:
content = '[空消息]'
formatted_msg = f"{nickname}: {content}"
formatted_messages.append(formatted_msg)
# 合并所有消息
combined_messages = '\n'.join(formatted_messages)
print(f"\033[1;34m[调试信息]\033[0m 合并后的转发消息: {combined_messages}")
return f"[转发消息:\n{combined_messages}]"
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 处理转发消息失败: {str(e)}")
return '[转发消息]'
def translate_reply(self) -> str:
async def translate_reply(self) -> str:
"""处理回复类型的CQ码"""
# 创建Message对象
@@ -289,7 +292,7 @@ class CQCode:
if self.reply_message == None:
# print(f"\033[1;31m[错误]\033[0m 回复消息为空")
return '[回复某人消息]'
if self.reply_message.sender.user_id:
message_obj = Message(
user_id=self.reply_message.sender.user_id,
@@ -297,6 +300,7 @@ class CQCode:
raw_message=str(self.reply_message.message),
group_id=self.group_id
)
await message_obj.initialize()
if message_obj.user_id == global_config.BOT_QQ:
return f"[回复 {global_config.BOT_NICKNAME} 的消息: {message_obj.processed_plain_text}]"
else:
@@ -310,9 +314,9 @@ class CQCode:
def unescape(text: str) -> str:
"""反转义CQ码中的特殊字符"""
return text.replace(',', ',') \
.replace('[', '[') \
.replace(']', ']') \
.replace('&', '&')
.replace('[', '[') \
.replace(']', ']') \
.replace('&', '&')
@staticmethod
def create_emoji_cq(file_path: str) -> str:
@@ -327,12 +331,13 @@ class CQCode:
abs_path = os.path.abspath(file_path)
# 转义特殊字符
escaped_path = abs_path.replace('&', '&') \
.replace('[', '[') \
.replace(']', ']') \
.replace(',', ',')
.replace('[', '[') \
.replace(']', ']') \
.replace(',', ',')
# 生成CQ码设置sub_type=1表示这是表情包
return f"[CQ:image,file=file:///{escaped_path},sub_type=1]"
class CQCode_tool:
@staticmethod
async def cq_from_dict_to_class(cq_code: Dict, reply: Optional[Dict] = None) -> CQCode:
@@ -354,7 +359,7 @@ class CQCode_tool:
params['text'] = cq_code.get('data', {}).get('text', '')
else:
params = cq_code.get('data', {})
instance = CQCode(
type=cq_type,
params=params,
@@ -362,11 +367,11 @@ class CQCode_tool:
user_id=0,
reply_message=reply
)
# 进行翻译处理
await instance.translate()
return instance
@staticmethod
def create_reply_cq(message_id: int) -> str:
"""
@@ -377,6 +382,6 @@ class CQCode_tool:
回复CQ码字符串
"""
return f"[CQ:reply,id={message_id}]"
cq_code_tool = CQCode_tool()