fix: 去除emoji_manager中的图片压缩
This commit is contained in:
@@ -20,6 +20,7 @@ import traceback
|
|||||||
from nonebot import get_driver
|
from nonebot import get_driver
|
||||||
from ..chat.config import global_config
|
from ..chat.config import global_config
|
||||||
from ..models.utils_model import LLM_request
|
from ..models.utils_model import LLM_request
|
||||||
|
from ..chat.utils_image import image_path_to_base64
|
||||||
from ..chat.utils import get_embedding
|
from ..chat.utils import get_embedding
|
||||||
|
|
||||||
driver = get_driver()
|
driver = get_driver()
|
||||||
@@ -197,75 +198,6 @@ class EmojiManager:
|
|||||||
logger.error(f"获取标签失败: {str(e)}")
|
logger.error(f"获取标签失败: {str(e)}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def _compress_image(self, image_path: str, target_size: int = 0.8 * 1024 * 1024) -> Optional[str]:
|
|
||||||
"""压缩图片并返回base64编码
|
|
||||||
Args:
|
|
||||||
image_path: 图片文件路径
|
|
||||||
target_size: 目标文件大小(字节),默认0.8MB
|
|
||||||
Returns:
|
|
||||||
Optional[str]: 成功返回base64编码的图片数据,失败返回None
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
file_size = os.path.getsize(image_path)
|
|
||||||
if file_size <= target_size:
|
|
||||||
# 如果文件已经小于目标大小,直接读取并返回base64
|
|
||||||
with open(image_path, 'rb') as f:
|
|
||||||
return base64.b64encode(f.read()).decode('utf-8')
|
|
||||||
|
|
||||||
# 打开图片
|
|
||||||
with Image.open(image_path) as img:
|
|
||||||
# 获取原始尺寸
|
|
||||||
original_width, original_height = img.size
|
|
||||||
|
|
||||||
# 计算缩放比例
|
|
||||||
scale = min(1.0, (target_size / file_size) ** 0.5)
|
|
||||||
|
|
||||||
# 计算新的尺寸
|
|
||||||
new_width = int(original_width * scale)
|
|
||||||
new_height = int(original_height * scale)
|
|
||||||
|
|
||||||
# 创建内存缓冲区
|
|
||||||
output_buffer = io.BytesIO()
|
|
||||||
|
|
||||||
# 如果是GIF,处理所有帧
|
|
||||||
if getattr(img, "is_animated", False):
|
|
||||||
frames = []
|
|
||||||
for frame_idx in range(img.n_frames):
|
|
||||||
img.seek(frame_idx)
|
|
||||||
new_frame = img.copy()
|
|
||||||
new_frame = new_frame.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
||||||
frames.append(new_frame)
|
|
||||||
|
|
||||||
# 保存到缓冲区
|
|
||||||
frames[0].save(
|
|
||||||
output_buffer,
|
|
||||||
format='GIF',
|
|
||||||
save_all=True,
|
|
||||||
append_images=frames[1:],
|
|
||||||
optimize=True,
|
|
||||||
duration=img.info.get('duration', 100),
|
|
||||||
loop=img.info.get('loop', 0)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# 处理静态图片
|
|
||||||
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
||||||
|
|
||||||
# 保存到缓冲区,保持原始格式
|
|
||||||
if img.format == 'PNG' and img.mode in ('RGBA', 'LA'):
|
|
||||||
resized_img.save(output_buffer, format='PNG', optimize=True)
|
|
||||||
else:
|
|
||||||
resized_img.save(output_buffer, format='JPEG', quality=95, optimize=True)
|
|
||||||
|
|
||||||
# 获取压缩后的数据并转换为base64
|
|
||||||
compressed_data = output_buffer.getvalue()
|
|
||||||
logger.success(f"压缩图片: {os.path.basename(image_path)} ({original_width}x{original_height} -> {new_width}x{new_height})")
|
|
||||||
|
|
||||||
return base64.b64encode(compressed_data).decode('utf-8')
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"压缩图片失败: {os.path.basename(image_path)}, 错误: {str(e)}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
async def scan_new_emojis(self):
|
async def scan_new_emojis(self):
|
||||||
"""扫描新的表情包"""
|
"""扫描新的表情包"""
|
||||||
try:
|
try:
|
||||||
@@ -284,7 +216,7 @@ class EmojiManager:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
# 压缩图片并获取base64编码
|
# 压缩图片并获取base64编码
|
||||||
image_base64 = await self._compress_image(image_path)
|
image_base64 = image_path_to_base64(image_path)
|
||||||
if image_base64 is None:
|
if image_base64 is None:
|
||||||
os.remove(image_path)
|
os.remove(image_path)
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -290,3 +290,18 @@ def compress_base64_image_by_scale(base64_data: str, target_size: int = 0.8 * 10
|
|||||||
import traceback
|
import traceback
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
return base64_data
|
return base64_data
|
||||||
|
|
||||||
|
def image_path_to_base64(image_path: str) -> str:
|
||||||
|
"""将图片路径转换为base64编码
|
||||||
|
Args:
|
||||||
|
image_path: 图片文件路径
|
||||||
|
Returns:
|
||||||
|
str: base64编码的图片数据
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
with open(image_path, 'rb') as f:
|
||||||
|
image_data = f.read()
|
||||||
|
return base64.b64encode(image_data).decode('utf-8')
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"读取图片失败: {image_path}, 错误: {str(e)}")
|
||||||
|
return None
|
||||||
Reference in New Issue
Block a user