v0.2.3
修复了表情包无法偷的bug
This commit is contained in:
BIN
requirements.txt
BIN
requirements.txt
Binary file not shown.
@@ -4,15 +4,14 @@ import html
|
|||||||
import requests
|
import requests
|
||||||
import base64
|
import base64
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
import io
|
|
||||||
from .image_utils import storage_compress_image, storage_emoji
|
|
||||||
import os
|
import os
|
||||||
from random import random
|
from random import random
|
||||||
from nonebot.adapters.onebot.v11 import Bot
|
from nonebot.adapters.onebot.v11 import Bot
|
||||||
from .config import global_config, llm_config
|
from .config import global_config, llm_config
|
||||||
import time
|
import time
|
||||||
import asyncio
|
import asyncio
|
||||||
|
from .utils_image import storage_image,storage_emoji
|
||||||
|
from .utils_user import get_user_nickname
|
||||||
#解析各种CQ码
|
#解析各种CQ码
|
||||||
#包含CQ码类
|
#包含CQ码类
|
||||||
|
|
||||||
@@ -85,11 +84,11 @@ class CQCode:
|
|||||||
else:
|
else:
|
||||||
self.translated_plain_text = self.translate_emoji()
|
self.translated_plain_text = self.translate_emoji()
|
||||||
elif self.type == 'at':
|
elif self.type == 'at':
|
||||||
from .message import Message
|
user_nickname = get_user_nickname(self.params.get('qq', ''))
|
||||||
message_obj = Message(
|
if user_nickname:
|
||||||
user_id=str(self.params.get('qq', ''))
|
self.translated_plain_text = f"[@{user_nickname}]"
|
||||||
)
|
else:
|
||||||
self.translated_plain_text = f"@{message_obj.user_nickname}"
|
self.translated_plain_text = f"[@某人]"
|
||||||
elif self.type == 'reply':
|
elif self.type == 'reply':
|
||||||
self.translated_plain_text = self.translate_reply()
|
self.translated_plain_text = self.translate_reply()
|
||||||
elif self.type == 'face':
|
elif self.type == 'face':
|
||||||
@@ -151,6 +150,7 @@ class CQCode:
|
|||||||
image_base64 = base64.b64encode(content).decode('utf-8')
|
image_base64 = base64.b64encode(content).decode('utf-8')
|
||||||
if image_base64:
|
if image_base64:
|
||||||
self.image_base64 = image_base64
|
self.image_base64 = image_base64
|
||||||
|
|
||||||
return image_base64
|
return image_base64
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
@@ -159,9 +159,12 @@ class CQCode:
|
|||||||
"""处理表情包类型的CQ码"""
|
"""处理表情包类型的CQ码"""
|
||||||
if 'url' not in self.params:
|
if 'url' not in self.params:
|
||||||
return '[表情包]'
|
return '[表情包]'
|
||||||
base64 = self.get_img()
|
base64_str = self.get_img()
|
||||||
if base64:
|
if base64_str:
|
||||||
return self.get_image_description(base64)
|
# 将 base64 字符串转换为字节类型
|
||||||
|
image_bytes = base64.b64decode(base64_str)
|
||||||
|
storage_emoji(image_bytes)
|
||||||
|
return self.get_image_description(base64_str)
|
||||||
else:
|
else:
|
||||||
return '[表情包]'
|
return '[表情包]'
|
||||||
|
|
||||||
@@ -171,9 +174,11 @@ class CQCode:
|
|||||||
#没有url,直接返回默认文本
|
#没有url,直接返回默认文本
|
||||||
if 'url' not in self.params:
|
if 'url' not in self.params:
|
||||||
return '[图片]'
|
return '[图片]'
|
||||||
base64 = self.get_img()
|
base64_str = self.get_img()
|
||||||
if base64:
|
if base64_str:
|
||||||
return self.get_image_description(base64)
|
image_bytes = base64.b64decode(base64_str)
|
||||||
|
storage_image(image_bytes)
|
||||||
|
return self.get_image_description(base64_str)
|
||||||
else:
|
else:
|
||||||
return '[图片]'
|
return '[图片]'
|
||||||
|
|
||||||
|
|||||||
@@ -183,7 +183,7 @@ class MessageSendControl:
|
|||||||
if isinstance(message, Message_Thinking):
|
if isinstance(message, Message_Thinking):
|
||||||
message.update_thinking_time()
|
message.update_thinking_time()
|
||||||
thinking_time = message.thinking_time
|
thinking_time = message.thinking_time
|
||||||
if thinking_time < 60: # 最少思考2秒
|
if thinking_time < 90: # 最少思考2秒
|
||||||
if int(thinking_time) % 10 == 0:
|
if int(thinking_time) % 10 == 0:
|
||||||
print(f"\033[1;34m[调试]\033[0m 消息正在思考中,已思考{thinking_time:.1f}秒")
|
print(f"\033[1;34m[调试]\033[0m 消息正在思考中,已思考{thinking_time:.1f}秒")
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ from collections import defaultdict
|
|||||||
import asyncio
|
import asyncio
|
||||||
from .message import Message
|
from .message import Message
|
||||||
from ...common.database import Database
|
from ...common.database import Database
|
||||||
from .image_utils import storage_compress_image
|
|
||||||
|
|
||||||
class MessageStorage:
|
class MessageStorage:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
|||||||
@@ -10,6 +10,16 @@ import base64
|
|||||||
|
|
||||||
bot_config = BotConfig.load_config()
|
bot_config = BotConfig.load_config()
|
||||||
|
|
||||||
|
|
||||||
|
def storage_image(image_data: bytes,type: str, max_size: int = 200) -> bytes:
|
||||||
|
if type == 'image':
|
||||||
|
return storage_compress_image(image_data, max_size)
|
||||||
|
elif type == 'emoji':
|
||||||
|
return storage_emoji(image_data)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"不支持的图片类型: {type}")
|
||||||
|
|
||||||
|
|
||||||
def storage_compress_image(image_data: bytes, max_size: int = 200) -> bytes:
|
def storage_compress_image(image_data: bytes, max_size: int = 200) -> bytes:
|
||||||
"""
|
"""
|
||||||
压缩图片到指定大小(单位:KB)并在数据库中记录图片信息
|
压缩图片到指定大小(单位:KB)并在数据库中记录图片信息
|
||||||
@@ -162,4 +172,45 @@ def storage_emoji(image_data: bytes) -> bytes:
|
|||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"\033[1;31m[错误]\033[0m 保存表情包失败: {str(e)}")
|
print(f"\033[1;31m[错误]\033[0m 保存表情包失败: {str(e)}")
|
||||||
|
return image_data
|
||||||
|
|
||||||
|
|
||||||
|
def storage_image(image_data: bytes) -> bytes:
|
||||||
|
"""
|
||||||
|
存储图片到本地文件夹
|
||||||
|
Args:
|
||||||
|
image_data: 图片字节数据
|
||||||
|
group_id: 群组ID(仅用于日志)
|
||||||
|
user_id: 用户ID(仅用于日志)
|
||||||
|
Returns:
|
||||||
|
bytes: 原始图片数据
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# 使用 CRC32 计算哈希值
|
||||||
|
hash_value = format(zlib.crc32(image_data) & 0xFFFFFFFF, 'x')
|
||||||
|
|
||||||
|
# 确保表情包目录存在
|
||||||
|
image_dir = "data/image"
|
||||||
|
os.makedirs(image_dir, exist_ok=True)
|
||||||
|
|
||||||
|
# 检查是否已存在相同哈希值的文件
|
||||||
|
for filename in os.listdir(image_dir):
|
||||||
|
if hash_value in filename:
|
||||||
|
# print(f"\033[1;33m[提示]\033[0m 发现重复表情包: {filename}")
|
||||||
|
return image_data
|
||||||
|
|
||||||
|
# 生成文件名
|
||||||
|
timestamp = int(time.time())
|
||||||
|
filename = f"{timestamp}_{hash_value}.jpg"
|
||||||
|
image_path = os.path.join(image_dir, filename)
|
||||||
|
|
||||||
|
# 直接保存原始文件
|
||||||
|
with open(image_path, "wb") as f:
|
||||||
|
f.write(image_data)
|
||||||
|
|
||||||
|
print(f"\033[1;32m[成功]\033[0m 保存图片到: {image_path}")
|
||||||
|
return image_data
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"\033[1;31m[错误]\033[0m 保存图片失败: {str(e)}")
|
||||||
return image_data
|
return image_data
|
||||||
Reference in New Issue
Block a user