feat: 超大型重构
This commit is contained in:
@@ -3,7 +3,7 @@ import html
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Optional
|
||||
from typing import Dict, Optional, List, Union
|
||||
|
||||
import requests
|
||||
|
||||
@@ -12,12 +12,14 @@ import requests
|
||||
import urllib3
|
||||
from nonebot import get_driver
|
||||
from urllib3.util import create_urllib3_context
|
||||
from loguru import logger
|
||||
|
||||
from ..models.utils_model import LLM_request
|
||||
from .config import global_config
|
||||
from .mapper import emojimapper
|
||||
from .utils_image import storage_emoji, storage_image
|
||||
from .utils_image import image_manager
|
||||
from .utils_user import get_user_nickname
|
||||
from .message_base import Seg
|
||||
|
||||
driver = get_driver()
|
||||
config = driver.config
|
||||
@@ -48,16 +50,15 @@ class CQCode:
|
||||
type: CQ码类型(如'image', 'at', 'face'等)
|
||||
params: CQ码的参数字典
|
||||
raw_code: 原始CQ码字符串
|
||||
translated_plain_text: 经过处理(如AI翻译)后的文本表示
|
||||
translated_segments: 经过处理后的Seg对象列表
|
||||
"""
|
||||
type: str
|
||||
params: Dict[str, str]
|
||||
# raw_code: str
|
||||
group_id: int
|
||||
user_id: int
|
||||
group_name: str = ""
|
||||
user_nickname: str = ""
|
||||
translated_plain_text: Optional[str] = None
|
||||
translated_segments: Optional[Union[Seg, List[Seg]]] = None
|
||||
reply_message: Dict = None # 存储回复消息
|
||||
image_base64: Optional[str] = None
|
||||
_llm: Optional[LLM_request] = None
|
||||
@@ -66,31 +67,72 @@ class CQCode:
|
||||
"""初始化LLM实例"""
|
||||
self._llm = LLM_request(model=global_config.vlm, temperature=0.4, max_tokens=300)
|
||||
|
||||
async def translate(self):
|
||||
"""根据CQ码类型进行相应的翻译处理"""
|
||||
def translate(self):
|
||||
"""根据CQ码类型进行相应的翻译处理,转换为Seg对象"""
|
||||
if self.type == 'text':
|
||||
self.translated_plain_text = self.params.get('text', '')
|
||||
self.translated_segments = Seg(
|
||||
type='text',
|
||||
data=self.params.get('text', '')
|
||||
)
|
||||
elif self.type == 'image':
|
||||
if self.params.get('sub_type') == '0':
|
||||
self.translated_plain_text = await self.translate_image()
|
||||
base64_data = self.translate_image()
|
||||
if base64_data:
|
||||
if self.params.get('sub_type') == '0':
|
||||
self.translated_segments = Seg(
|
||||
type='image',
|
||||
data=base64_data
|
||||
)
|
||||
else:
|
||||
self.translated_segments = Seg(
|
||||
type='emoji',
|
||||
data=base64_data
|
||||
)
|
||||
else:
|
||||
self.translated_plain_text = await self.translate_emoji()
|
||||
self.translated_segments = Seg(
|
||||
type='text',
|
||||
data='[图片]'
|
||||
)
|
||||
elif self.type == 'at':
|
||||
user_nickname = get_user_nickname(self.params.get('qq', ''))
|
||||
if user_nickname:
|
||||
self.translated_plain_text = f"[@{user_nickname}]"
|
||||
else:
|
||||
self.translated_plain_text = "@某人"
|
||||
self.translated_segments = Seg(
|
||||
type='text',
|
||||
data=f"[@{user_nickname or '某人'}]"
|
||||
)
|
||||
elif self.type == 'reply':
|
||||
self.translated_plain_text = await self.translate_reply()
|
||||
reply_segments = self.translate_reply()
|
||||
if reply_segments:
|
||||
self.translated_segments = Seg(
|
||||
type='seglist',
|
||||
data=reply_segments
|
||||
)
|
||||
else:
|
||||
self.translated_segments = Seg(
|
||||
type='text',
|
||||
data='[回复某人消息]'
|
||||
)
|
||||
elif self.type == 'face':
|
||||
face_id = self.params.get('id', '')
|
||||
# self.translated_plain_text = f"[表情{face_id}]"
|
||||
self.translated_plain_text = f"[{emojimapper.get(int(face_id), '表情')}]"
|
||||
self.translated_segments = Seg(
|
||||
type='text',
|
||||
data=f"[{emojimapper.get(int(face_id), '表情')}]"
|
||||
)
|
||||
elif self.type == 'forward':
|
||||
self.translated_plain_text = await self.translate_forward()
|
||||
forward_segments = self.translate_forward()
|
||||
if forward_segments:
|
||||
self.translated_segments = Seg(
|
||||
type='seglist',
|
||||
data=forward_segments
|
||||
)
|
||||
else:
|
||||
self.translated_segments = Seg(
|
||||
type='text',
|
||||
data='[转发消息]'
|
||||
)
|
||||
else:
|
||||
self.translated_plain_text = f"[{self.type}]"
|
||||
self.translated_segments = Seg(
|
||||
type='text',
|
||||
data=f"[{self.type}]"
|
||||
)
|
||||
|
||||
def get_img(self):
|
||||
'''
|
||||
@@ -160,155 +202,101 @@ class CQCode:
|
||||
|
||||
return None
|
||||
|
||||
async def translate_emoji(self) -> str:
|
||||
"""处理表情包类型的CQ码"""
|
||||
|
||||
def translate_image(self) -> Optional[str]:
|
||||
"""处理图片类型的CQ码,返回base64字符串"""
|
||||
if 'url' not in self.params:
|
||||
return '[表情包]'
|
||||
base64_str = self.get_img()
|
||||
if base64_str:
|
||||
# 将 base64 字符串转换为字节类型
|
||||
image_bytes = base64.b64decode(base64_str)
|
||||
storage_emoji(image_bytes)
|
||||
return await self.get_emoji_description(base64_str)
|
||||
else:
|
||||
return '[表情包]'
|
||||
return None
|
||||
return self.get_img()
|
||||
|
||||
async def translate_image(self) -> str:
|
||||
"""处理图片类型的CQ码,区分普通图片和表情包"""
|
||||
# 没有url,直接返回默认文本
|
||||
if 'url' not in self.params:
|
||||
return '[图片]'
|
||||
base64_str = self.get_img()
|
||||
if base64_str:
|
||||
image_bytes = base64.b64decode(base64_str)
|
||||
storage_image(image_bytes)
|
||||
return await self.get_image_description(base64_str)
|
||||
else:
|
||||
return '[图片]'
|
||||
|
||||
async def get_emoji_description(self, image_base64: str) -> str:
|
||||
"""调用AI接口获取表情包描述"""
|
||||
try:
|
||||
prompt = "这是一个表情包,请用简短的中文描述这个表情包传达的情感和含义。最多20个字。"
|
||||
# description, _ = self._llm.generate_response_for_image_sync(prompt, image_base64)
|
||||
description, _ = await self._llm.generate_response_for_image(prompt, image_base64)
|
||||
return f"[表情包:{description}]"
|
||||
except Exception as e:
|
||||
print(f"\033[1;31m[错误]\033[0m AI接口调用失败: {str(e)}")
|
||||
return "[表情包]"
|
||||
|
||||
async def get_image_description(self, image_base64: str) -> str:
|
||||
"""调用AI接口获取普通图片描述"""
|
||||
try:
|
||||
prompt = "请用中文描述这张图片的内容。如果有文字,请把文字都描述出来。并尝试猜测这个图片的含义。最多200个字。"
|
||||
# description, _ = self._llm.generate_response_for_image_sync(prompt, image_base64)
|
||||
description, _ = await self._llm.generate_response_for_image(prompt, image_base64)
|
||||
return f"[图片:{description}]"
|
||||
except Exception as e:
|
||||
print(f"\033[1;31m[错误]\033[0m AI接口调用失败: {str(e)}")
|
||||
return "[图片]"
|
||||
|
||||
async def translate_forward(self) -> str:
|
||||
"""处理转发消息"""
|
||||
def translate_forward(self) -> Optional[List[Seg]]:
|
||||
"""处理转发消息,返回Seg列表"""
|
||||
try:
|
||||
if 'content' not in self.params:
|
||||
return '[转发消息]'
|
||||
return None
|
||||
|
||||
# 解析content内容(需要先反转义)
|
||||
content = self.unescape(self.params['content'])
|
||||
# print(f"\033[1;34m[调试信息]\033[0m 转发消息内容: {content}")
|
||||
# 将字符串形式的列表转换为Python对象
|
||||
import ast
|
||||
try:
|
||||
messages = ast.literal_eval(content)
|
||||
except ValueError as e:
|
||||
print(f"\033[1;31m[错误]\033[0m 解析转发消息内容失败: {str(e)}")
|
||||
return '[转发消息]'
|
||||
logger.error(f"解析转发消息内容失败: {str(e)}")
|
||||
return None
|
||||
|
||||
# 处理每条消息
|
||||
formatted_messages = []
|
||||
formatted_segments = []
|
||||
for msg in messages:
|
||||
sender = msg.get('sender', {})
|
||||
nickname = sender.get('card') or sender.get('nickname', '未知用户')
|
||||
|
||||
# 获取消息内容并使用Message类处理
|
||||
raw_message = msg.get('raw_message', '')
|
||||
message_array = msg.get('message', [])
|
||||
|
||||
if message_array and isinstance(message_array, list):
|
||||
# 检查是否包含嵌套的转发消息
|
||||
for message_part in message_array:
|
||||
if message_part.get('type') == 'forward':
|
||||
content = '[转发消息]'
|
||||
content_seg = Seg(type='text', data='[转发消息]')
|
||||
break
|
||||
else:
|
||||
# 处理普通消息
|
||||
if raw_message:
|
||||
from .message import Message
|
||||
message_obj = Message(
|
||||
user_id=msg.get('user_id', 0),
|
||||
message_id=msg.get('message_id', 0),
|
||||
raw_message=raw_message,
|
||||
plain_text=raw_message,
|
||||
group_id=msg.get('group_id', 0)
|
||||
)
|
||||
await message_obj.initialize()
|
||||
content = message_obj.processed_plain_text
|
||||
else:
|
||||
content = '[空消息]'
|
||||
if raw_message:
|
||||
from .message_cq import MessageRecvCQ
|
||||
message_obj = MessageRecvCQ(
|
||||
user_id=msg.get('user_id', 0),
|
||||
message_id=msg.get('message_id', 0),
|
||||
raw_message=raw_message,
|
||||
plain_text=raw_message,
|
||||
group_id=msg.get('group_id', 0)
|
||||
)
|
||||
content_seg = Seg(type='seglist', data=message_obj.message_segments)
|
||||
else:
|
||||
content_seg = Seg(type='text', data='[空消息]')
|
||||
else:
|
||||
# 处理普通消息
|
||||
if raw_message:
|
||||
from .message import Message
|
||||
message_obj = Message(
|
||||
from .message_cq import MessageRecvCQ
|
||||
message_obj = MessageRecvCQ(
|
||||
user_id=msg.get('user_id', 0),
|
||||
message_id=msg.get('message_id', 0),
|
||||
raw_message=raw_message,
|
||||
plain_text=raw_message,
|
||||
group_id=msg.get('group_id', 0)
|
||||
)
|
||||
await message_obj.initialize()
|
||||
content = message_obj.processed_plain_text
|
||||
content_seg = Seg(type='seglist', data=message_obj.message_segments)
|
||||
else:
|
||||
content = '[空消息]'
|
||||
content_seg = Seg(type='text', data='[空消息]')
|
||||
|
||||
formatted_msg = f"{nickname}: {content}"
|
||||
formatted_messages.append(formatted_msg)
|
||||
formatted_segments.append(Seg(type='text', data=f"{nickname}: "))
|
||||
formatted_segments.append(content_seg)
|
||||
formatted_segments.append(Seg(type='text', data='\n'))
|
||||
|
||||
# 合并所有消息
|
||||
combined_messages = '\n'.join(formatted_messages)
|
||||
print(f"\033[1;34m[调试信息]\033[0m 合并后的转发消息: {combined_messages}")
|
||||
return f"[转发消息:\n{combined_messages}]"
|
||||
return formatted_segments
|
||||
|
||||
except Exception as e:
|
||||
print(f"\033[1;31m[错误]\033[0m 处理转发消息失败: {str(e)}")
|
||||
return '[转发消息]'
|
||||
logger.error(f"处理转发消息失败: {str(e)}")
|
||||
return None
|
||||
|
||||
async def translate_reply(self) -> str:
|
||||
"""处理回复类型的CQ码"""
|
||||
|
||||
# 创建Message对象
|
||||
from .message import Message
|
||||
if self.reply_message == None:
|
||||
# print(f"\033[1;31m[错误]\033[0m 回复消息为空")
|
||||
return '[回复某人消息]'
|
||||
def translate_reply(self) -> Optional[List[Seg]]:
|
||||
"""处理回复类型的CQ码,返回Seg列表"""
|
||||
from .message_cq import MessageRecvCQ
|
||||
if self.reply_message is None:
|
||||
return None
|
||||
|
||||
if self.reply_message.sender.user_id:
|
||||
message_obj = Message(
|
||||
message_obj = MessageRecvCQ(
|
||||
user_id=self.reply_message.sender.user_id,
|
||||
message_id=self.reply_message.message_id,
|
||||
raw_message=str(self.reply_message.message),
|
||||
group_id=self.group_id
|
||||
)
|
||||
await message_obj.initialize()
|
||||
|
||||
segments = []
|
||||
if message_obj.user_id == global_config.BOT_QQ:
|
||||
return f"[回复 {global_config.BOT_NICKNAME} 的消息: {message_obj.processed_plain_text}]"
|
||||
segments.append(Seg(type='text', data=f"[回复 {global_config.BOT_NICKNAME} 的消息: "))
|
||||
else:
|
||||
return f"[回复 {self.reply_message.sender.nickname} 的消息: {message_obj.processed_plain_text}]"
|
||||
|
||||
segments.append(Seg(type='text', data=f"[回复 {self.reply_message.sender.nickname} 的消息: "))
|
||||
|
||||
segments.append(Seg(type='seglist', data=message_obj.message_segments))
|
||||
segments.append(Seg(type='text', data="]"))
|
||||
return segments
|
||||
else:
|
||||
print("\033[1;31m[错误]\033[0m 回复消息的sender.user_id为空")
|
||||
return '[回复某人消息]'
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def unescape(text: str) -> str:
|
||||
@@ -318,29 +306,12 @@ class CQCode:
|
||||
.replace(']', ']') \
|
||||
.replace('&', '&')
|
||||
|
||||
@staticmethod
|
||||
def create_emoji_cq(file_path: str) -> str:
|
||||
"""
|
||||
创建表情包CQ码
|
||||
Args:
|
||||
file_path: 本地表情包文件路径
|
||||
Returns:
|
||||
表情包CQ码字符串
|
||||
"""
|
||||
# 确保使用绝对路径
|
||||
abs_path = os.path.abspath(file_path)
|
||||
# 转义特殊字符
|
||||
escaped_path = abs_path.replace('&', '&') \
|
||||
.replace('[', '[') \
|
||||
.replace(']', ']') \
|
||||
.replace(',', ',')
|
||||
# 生成CQ码,设置sub_type=1表示这是表情包
|
||||
return f"[CQ:image,file=file:///{escaped_path},sub_type=1]"
|
||||
|
||||
|
||||
|
||||
class CQCode_tool:
|
||||
@staticmethod
|
||||
async def cq_from_dict_to_class(cq_code: Dict, reply: Optional[Dict] = None) -> CQCode:
|
||||
def cq_from_dict_to_class(cq_code: Dict, reply: Optional[Dict] = None) -> CQCode:
|
||||
"""
|
||||
将CQ码字典转换为CQCode对象
|
||||
|
||||
@@ -369,7 +340,7 @@ class CQCode_tool:
|
||||
)
|
||||
|
||||
# 进行翻译处理
|
||||
await instance.translate()
|
||||
instance.translate()
|
||||
return instance
|
||||
|
||||
@staticmethod
|
||||
@@ -382,6 +353,27 @@ class CQCode_tool:
|
||||
回复CQ码字符串
|
||||
"""
|
||||
return f"[CQ:reply,id={message_id}]"
|
||||
|
||||
@staticmethod
|
||||
def create_emoji_cq(file_path: str) -> str:
|
||||
"""
|
||||
创建表情包CQ码
|
||||
Args:
|
||||
file_path: 本地表情包文件路径
|
||||
Returns:
|
||||
表情包CQ码字符串
|
||||
"""
|
||||
# 确保使用绝对路径
|
||||
abs_path = os.path.abspath(file_path)
|
||||
# 转义特殊字符
|
||||
escaped_path = abs_path.replace('&', '&') \
|
||||
.replace('[', '[') \
|
||||
.replace(']', ']') \
|
||||
.replace(',', ',')
|
||||
# 生成CQ码,设置sub_type=1表示这是表情包
|
||||
return f"[CQ:image,file=file:///{escaped_path},sub_type=1]"
|
||||
|
||||
|
||||
|
||||
|
||||
cq_code_tool = CQCode_tool()
|
||||
|
||||
Reference in New Issue
Block a user