Files
Mofox-Core/src/plugins/chat/cq_code.py
SengokuCola 972e6066e6 v0.1
能跑但是没写部署教程,主题和记忆识别也没写完
2025-02-26 18:12:28 +08:00

422 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from dataclasses import dataclass
from typing import Dict, Optional
import html
import requests
import base64
from PIL import Image
import io
from .image_utils import storage_compress_image, storage_emoji
import os
from random import random
from nonebot.adapters.onebot.v11 import Bot
from .config import global_config, llm_config
import time
import asyncio
@dataclass
class CQCode:
"""
CQ码数据类用于存储和处理CQ码
属性:
type: CQ码类型'image', 'at', 'face'等)
params: CQ码的参数字典
raw_code: 原始CQ码字符串
translated_plain_text: 经过处理如AI翻译后的文本表示
"""
type: str
params: Dict[str, str]
raw_code: str
group_id: int
user_id: int
group_name: str = ""
user_nickname: str = ""
translated_plain_text: Optional[str] = None
reply_message: Dict = None # 存储回复消息
@classmethod
def from_cq_code(cls, cq_code: str, reply: Dict = None) -> 'CQCode':
"""
从CQ码字符串创建CQCode对象
例如:[CQ:image,file=1.jpg,url=http://example.com/1.jpg]
"""
# 移除前后的[]
content = cq_code[1:-1]
# 分离类型和参数部分
parts = content.split(',')
if not parts:
return cls('text', {'text': cq_code}, cq_code, group_id=0, user_id=0)
# 获取CQ类型
cq_type = parts[0][3:] # 去掉'CQ:'
# 解析参数
params = {}
for part in parts[1:]:
if '=' in part:
key, value = part.split('=', 1)
# 处理转义字符
value = cls.unescape(value)
params[key] = value
# 创建实例
instance = cls(cq_type, params, cq_code, group_id=0, user_id=0, reply_message=reply)
# 根据类型进行相应的翻译处理
instance.translate()
return instance
def translate(self):
"""根据CQ码类型进行相应的翻译处理"""
if self.type == 'text':
self.translated_plain_text = self.params.get('text', '')
elif self.type == 'image':
self.translated_plain_text = self.translate_image()
elif self.type == 'at':
from .message import Message
message_obj = Message(
user_id=str(self.params.get('qq', ''))
)
self.translated_plain_text = f"@{message_obj.user_nickname}"
elif self.type == 'reply':
self.translated_plain_text = self.translate_reply()
elif self.type == 'face':
face_id = self.params.get('id', '')
# self.translated_plain_text = f"[表情{face_id}]"
self.translated_plain_text = f"[表情]"
elif self.type == 'forward':
self.translated_plain_text = self.translate_forward()
else:
self.translated_plain_text = f"[{self.type}]"
def translate_image(self) -> str:
"""处理图片类型的CQ码区分普通图片和表情包"""
if 'url' not in self.params:
return '[图片]'
# 获取子类型,默认为普通图片(0)
sub_type = int(self.params.get('sub_type', '0'))
is_emoji = (sub_type == 1)
# 添加请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
# 处理URL编码问题
url = html.unescape(self.params['url'])
if not url.startswith(('http://', 'https://')):
raise ValueError(f"无效的URL格式: {url}")
# 下载图片
response = requests.get(url, headers=headers, timeout=10, verify=False)
if response.status_code == 200:
# 检查响应内容类型
content_type = response.headers.get('content-type', '')
if not content_type.startswith('image/'):
raise ValueError(f"响应不是图片类型: {content_type}")
content = response.content
image_base64 = base64.b64encode(content).decode('utf-8')
# 根据子类型选择不同的处理方式
if sub_type == 1: # 表情包
return self.get_emoji_description(image_base64)
elif sub_type == 0: # 普通图片
if self.get_image_description_is_setu(image_base64) == "":
print(f"\033[1;34m[调试]\033[0m 哇!涩情图片")
# 使用相对路径创建目录
# data_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))), "data", "setu")
# os.makedirs(data_dir, exist_ok=True)
# # 生成随机文件名
# file_name = f"{int(time.time())}_{int(random() * 10000)}.jpg"
# file_path = os.path.join(data_dir, file_name)
# # 将base64解码并保存图片
# image_data = base64.b64decode(image_base64)
# with open(file_path, "wb") as f:
# f.write(image_data)
# print(f"\033[1;34m[调试]\033[0m 涩图已保存至: {file_path}")
return f"[一张涩情图片]"
return self.get_image_description(image_base64)
else: # 其他类型都按普通图片处理
return '[图片]'
else:
raise ValueError(f"下载图片失败: HTTP状态码 {response.status_code}")
def get_emoji_description(self, image_base64: str) -> str:
"""调用AI接口获取表情包描述"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {llm_config.SILICONFLOW_API_KEY}"
}
payload = {
"model": "deepseek-ai/deepseek-vl2",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "这是一个表情包请用简短的中文描述这个表情包传达的情感和含义。最多20个字。"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}"
}
}
]
}
],
"max_tokens": 50,
"temperature": 0.4
}
response = requests.post(
f"{llm_config.SILICONFLOW_BASE_URL}chat/completions",
headers=headers,
json=payload,
timeout=30
)
if response.status_code == 200:
result_json = response.json()
if "choices" in result_json and len(result_json["choices"]) > 0:
description = result_json["choices"][0]["message"]["content"]
return f"[表情包:{description}]"
raise ValueError(f"AI接口调用失败: {response.text}")
def get_image_description(self, image_base64: str) -> str:
"""调用AI接口获取普通图片描述"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {llm_config.SILICONFLOW_API_KEY}"
}
payload = {
"model": "deepseek-ai/deepseek-vl2",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "请用中文描述这张图片的内容。如果有文字请把文字都描述出来。并尝试猜测这个图片的含义。最多200个字。"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}"
}
}
]
}
],
"max_tokens": 300,
"temperature": 0.6
}
response = requests.post(
f"{llm_config.SILICONFLOW_BASE_URL}chat/completions",
headers=headers,
json=payload,
timeout=30
)
if response.status_code == 200:
result_json = response.json()
if "choices" in result_json and len(result_json["choices"]) > 0:
description = result_json["choices"][0]["message"]["content"]
return f"[图片:{description}]"
raise ValueError(f"AI接口调用失败: {response.text}")
def get_image_description_is_setu(self, image_base64: str) -> str:
"""调用AI接口获取普通图片描述"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {llm_config.SILICONFLOW_API_KEY}"
}
payload = {
"model": "deepseek-ai/deepseek-vl2",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "请回答我这张图片是否涉及涩情、情色、裸露或性暗示,请严格判断,有任何涩情迹象就回答是,请用是或否回答"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}"
}
}
]
}
],
"max_tokens": 300,
"temperature": 0.6
}
response = requests.post(
f"{llm_config.SILICONFLOW_BASE_URL}chat/completions",
headers=headers,
json=payload,
timeout=30
)
if response.status_code == 200:
result_json = response.json()
if "choices" in result_json and len(result_json["choices"]) > 0:
description = result_json["choices"][0]["message"]["content"]
# 如果描述中包含"否",返回否,其他情况返回是
return "" if "" in description else ""
raise ValueError(f"AI接口调用失败: {response.text}")
def translate_forward(self) -> str:
"""处理转发消息"""
try:
if 'content' not in self.params:
return '[转发消息]'
# 解析content内容需要先反转义
content = self.unescape(self.params['content'])
# print(f"\033[1;34m[调试信息]\033[0m 转发消息内容: {content}")
# 将字符串形式的列表转换为Python对象
import ast
try:
messages = ast.literal_eval(content)
except ValueError as e:
print(f"\033[1;31m[错误]\033[0m 解析转发消息内容失败: {str(e)}")
return '[转发消息]'
# 处理每条消息
formatted_messages = []
for msg in messages:
sender = msg.get('sender', {})
nickname = sender.get('card') or sender.get('nickname', '未知用户')
# 获取消息内容并使用Message类处理
raw_message = msg.get('raw_message', '')
message_array = msg.get('message', [])
if message_array and isinstance(message_array, list):
# 检查是否包含嵌套的转发消息
for message_part in message_array:
if message_part.get('type') == 'forward':
content = '[转发消息]'
break
else:
# 处理普通消息
if raw_message:
from .message import Message
message_obj = Message(
user_id=msg.get('user_id', 0),
message_id=msg.get('message_id', 0),
raw_message=raw_message,
plain_text=raw_message,
group_id=msg.get('group_id', 0)
)
content = message_obj.processed_plain_text
else:
content = '[空消息]'
else:
# 处理普通消息
if raw_message:
from .message import Message
message_obj = Message(
user_id=msg.get('user_id', 0),
message_id=msg.get('message_id', 0),
raw_message=raw_message,
plain_text=raw_message,
group_id=msg.get('group_id', 0)
)
content = message_obj.processed_plain_text
else:
content = '[空消息]'
formatted_msg = f"{nickname}: {content}"
formatted_messages.append(formatted_msg)
# 合并所有消息
combined_messages = '\n'.join(formatted_messages)
print(f"\033[1;34m[调试信息]\033[0m 合并后的转发消息: {combined_messages}")
return f"[转发消息:\n{combined_messages}]"
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 处理转发消息失败: {str(e)}")
return '[转发消息]'
def translate_reply(self) -> str:
"""处理回复类型的CQ码"""
# 创建Message对象
from .message import Message
if self.reply_message == None:
return '[回复某人消息]'
if self.reply_message.sender.user_id:
message_obj = Message(
user_id=self.reply_message.sender.user_id,
message_id=self.reply_message.message_id,
raw_message=str(self.reply_message.message),
group_id=self.group_id
)
if message_obj.user_id == global_config.BOT_QQ:
return f"[回复 麦麦 的消息: {message_obj.processed_plain_text}]"
else:
return f"[回复 {self.reply_message.sender.nickname} 的消息: {message_obj.processed_plain_text}]"
else:
return '[回复某人消息]'
@staticmethod
def unescape(text: str) -> str:
"""反转义CQ码中的特殊字符"""
return text.replace(',', ',') \
.replace('[', '[') \
.replace(']', ']') \
.replace('&', '&')
@staticmethod
def create_emoji_cq(file_path: str) -> str:
"""
创建表情包CQ码
Args:
file_path: 本地表情包文件路径
Returns:
表情包CQ码字符串
"""
# 确保使用绝对路径
abs_path = os.path.abspath(file_path)
# 转义特殊字符
escaped_path = abs_path.replace('&', '&') \
.replace('[', '[') \
.replace(']', ']') \
.replace(',', ',')
# 生成CQ码设置sub_type=1表示这是表情包
return f"[CQ:image,file=file:///{escaped_path},sub_type=1]"
@staticmethod
def create_reply_cq(message_id: int) -> str:
"""
创建回复CQ码
Args:
message_id: 回复的消息ID
Returns:
回复CQ码字符串
"""
return f"[CQ:reply,id={message_id}]"