diff --git a/src/plugins/chat/emoji_manager.py b/src/plugins/chat/emoji_manager.py index 1cdb62c07..4784c0a3d 100644 --- a/src/plugins/chat/emoji_manager.py +++ b/src/plugins/chat/emoji_manager.py @@ -20,6 +20,7 @@ import traceback from nonebot import get_driver from ..chat.config import global_config from ..models.utils_model import LLM_request +from ..chat.utils_image import image_path_to_base64 from ..chat.utils import get_embedding driver = get_driver() @@ -196,76 +197,7 @@ class EmojiManager: except Exception as e: logger.error(f"获取标签失败: {str(e)}") return None - - async def _compress_image(self, image_path: str, target_size: int = 0.8 * 1024 * 1024) -> Optional[str]: - """压缩图片并返回base64编码 - Args: - image_path: 图片文件路径 - target_size: 目标文件大小(字节),默认0.8MB - Returns: - Optional[str]: 成功返回base64编码的图片数据,失败返回None - """ - try: - file_size = os.path.getsize(image_path) - if file_size <= target_size: - # 如果文件已经小于目标大小,直接读取并返回base64 - with open(image_path, 'rb') as f: - return base64.b64encode(f.read()).decode('utf-8') - - # 打开图片 - with Image.open(image_path) as img: - # 获取原始尺寸 - original_width, original_height = img.size - - # 计算缩放比例 - scale = min(1.0, (target_size / file_size) ** 0.5) - - # 计算新的尺寸 - new_width = int(original_width * scale) - new_height = int(original_height * scale) - - # 创建内存缓冲区 - output_buffer = io.BytesIO() - - # 如果是GIF,处理所有帧 - if getattr(img, "is_animated", False): - frames = [] - for frame_idx in range(img.n_frames): - img.seek(frame_idx) - new_frame = img.copy() - new_frame = new_frame.resize((new_width, new_height), Image.Resampling.LANCZOS) - frames.append(new_frame) - # 保存到缓冲区 - frames[0].save( - output_buffer, - format='GIF', - save_all=True, - append_images=frames[1:], - optimize=True, - duration=img.info.get('duration', 100), - loop=img.info.get('loop', 0) - ) - else: - # 处理静态图片 - resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) - - # 保存到缓冲区,保持原始格式 - if img.format == 'PNG' and img.mode in ('RGBA', 'LA'): - resized_img.save(output_buffer, format='PNG', optimize=True) - else: - resized_img.save(output_buffer, format='JPEG', quality=95, optimize=True) - - # 获取压缩后的数据并转换为base64 - compressed_data = output_buffer.getvalue() - logger.success(f"压缩图片: {os.path.basename(image_path)} ({original_width}x{original_height} -> {new_width}x{new_height})") - - return base64.b64encode(compressed_data).decode('utf-8') - - except Exception as e: - logger.error(f"压缩图片失败: {os.path.basename(image_path)}, 错误: {str(e)}") - return None - async def scan_new_emojis(self): """扫描新的表情包""" try: @@ -284,7 +216,7 @@ class EmojiManager: continue # 压缩图片并获取base64编码 - image_base64 = await self._compress_image(image_path) + image_base64 = image_path_to_base64(image_path) if image_base64 is None: os.remove(image_path) continue diff --git a/src/plugins/chat/utils_image.py b/src/plugins/chat/utils_image.py index d79c0a913..eff788868 100644 --- a/src/plugins/chat/utils_image.py +++ b/src/plugins/chat/utils_image.py @@ -289,4 +289,19 @@ def compress_base64_image_by_scale(base64_data: str, target_size: int = 0.8 * 10 logger.error(f"压缩图片失败: {str(e)}") import traceback logger.error(traceback.format_exc()) - return base64_data \ No newline at end of file + return base64_data + +def image_path_to_base64(image_path: str) -> str: + """将图片路径转换为base64编码 + Args: + image_path: 图片文件路径 + Returns: + str: base64编码的图片数据 + """ + try: + with open(image_path, 'rb') as f: + image_data = f.read() + return base64.b64encode(image_data).decode('utf-8') + except Exception as e: + logger.error(f"读取图片失败: {image_path}, 错误: {str(e)}") + return None \ No newline at end of file