fix: emoji压缩功能正常使用

This commit is contained in:
tcmofashi
2025-03-05 09:04:10 +08:00
parent 959011e369
commit f6e5a15ed2

View File

@@ -12,6 +12,8 @@ import base64
import shutil
import asyncio
import time
from PIL import Image
import io
from nonebot import get_driver
from ..chat.config import global_config
@@ -240,41 +242,102 @@ class EmojiManager:
print(f"\033[1;32m[调试信息]\033[0m 使用默认标签: neutral")
return "skip" # 默认标签
async def _compress_image(self, image_path: str, target_size: int = 4 * 1024 * 1024) -> Optional[str]:
"""压缩图片并返回base64编码
Args:
image_path: 图片文件路径
target_size: 目标文件大小字节默认4MB
Returns:
Optional[str]: 成功返回base64编码的图片数据失败返回None
"""
try:
file_size = os.path.getsize(image_path)
if file_size <= target_size:
# 如果文件已经小于目标大小直接读取并返回base64
with open(image_path, 'rb') as f:
return base64.b64encode(f.read()).decode('utf-8')
# 打开图片
with Image.open(image_path) as img:
# 获取原始尺寸
original_width, original_height = img.size
# 计算缩放比例
scale = min(1.0, (target_size / file_size) ** 0.5)
# 计算新的尺寸
new_width = int(original_width * scale)
new_height = int(original_height * scale)
# 创建内存缓冲区
output_buffer = io.BytesIO()
# 如果是GIF处理所有帧
if getattr(img, "is_animated", False):
frames = []
for frame_idx in range(img.n_frames):
img.seek(frame_idx)
new_frame = img.copy()
new_frame = new_frame.resize((new_width, new_height), Image.Resampling.LANCZOS)
frames.append(new_frame)
# 保存到缓冲区
frames[0].save(
output_buffer,
format='GIF',
save_all=True,
append_images=frames[1:],
optimize=True,
duration=img.info.get('duration', 100),
loop=img.info.get('loop', 0)
)
else:
# 处理静态图片
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 保存到缓冲区,保持原始格式
if img.format == 'PNG' and img.mode in ('RGBA', 'LA'):
resized_img.save(output_buffer, format='PNG', optimize=True)
else:
resized_img.save(output_buffer, format='JPEG', quality=95, optimize=True)
# 获取压缩后的数据并转换为base64
compressed_data = output_buffer.getvalue()
print(f"\033[1;32m[成功]\033[0m 压缩图片: {os.path.basename(image_path)} ({original_width}x{original_height} -> {new_width}x{new_height})")
return base64.b64encode(compressed_data).decode('utf-8')
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 压缩图片失败: {os.path.basename(image_path)}, 错误: {str(e)}")
return None
async def scan_new_emojis(self):
"""扫描新的表情包"""
try:
emoji_dir = "data/emoji"
os.makedirs(emoji_dir, exist_ok=True)
# 获取所有jpg文件
files_to_process = [f for f in os.listdir(emoji_dir) if f.endswith('.jpg')]
# 获取所有支持的图片文件
files_to_process = [f for f in os.listdir(emoji_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif'))]
for filename in files_to_process:
image_path = os.path.join(emoji_dir, filename)
# 检查文件大小
file_size = os.path.getsize(image_path)
if file_size > 5 * 1024 * 1024: # 5MB
print(f"\033[1;33m[警告]\033[0m 表情包文件过大 ({file_size/1024/1024:.2f}MB),删除: {filename}")
os.remove(image_path)
continue
# 检查是否已经注册过
existing_emoji = self.db.db['emoji'].find_one({'filename': filename})
if existing_emoji:
continue
# 读取图片数据
with open(image_path, 'rb') as f:
image_data = f.read()
# 将图片转换为base64
image_base64 = base64.b64encode(image_data).decode('utf-8')
# 压缩图片并获取base64编码
image_base64 = await self._compress_image(image_path)
if image_base64 is None:
os.remove(image_path)
continue
# 获取表情包的情感标签
tag = await self._get_emoji_tag(image_base64)
if not tag == "skip":
# 准备数据库记录
# 准备数据库记录
emoji_record = {
'filename': filename,
'path': image_path,
@@ -288,7 +351,6 @@ class EmojiManager:
print(f"标签: {tag}")
else:
print(f"\033[1;33m[警告]\033[0m 跳过表情包: {filename}")
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 扫描表情包失败: {str(e)}")