555 lines
23 KiB
Python
555 lines
23 KiB
Python
import base64
|
||
import os
|
||
import time
|
||
import hashlib
|
||
import uuid
|
||
from typing import Optional, Tuple
|
||
from PIL import Image
|
||
import io
|
||
import numpy as np
|
||
import asyncio
|
||
|
||
|
||
from src.common.database.database import db
|
||
from src.common.database.database_model import Images, ImageDescriptions
|
||
from src.config.config import global_config
|
||
from src.llm_models.utils_model import LLMRequest
|
||
|
||
from src.common.logger import get_logger
|
||
from rich.traceback import install
|
||
|
||
install(extra_lines=3)
|
||
|
||
logger = get_logger("chat_image")
|
||
|
||
|
||
class ImageManager:
|
||
_instance = None
|
||
IMAGE_DIR = "data" # 图像存储根目录
|
||
|
||
def __new__(cls):
|
||
if cls._instance is None:
|
||
cls._instance = super().__new__(cls)
|
||
cls._instance._initialized = False
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
if not self._initialized:
|
||
self._ensure_image_dir()
|
||
|
||
self._initialized = True
|
||
self._llm = LLMRequest(model=global_config.model.vlm, temperature=0.4, max_tokens=300, request_type="image")
|
||
|
||
try:
|
||
db.connect(reuse_if_open=True)
|
||
db.create_tables([Images, ImageDescriptions], safe=True)
|
||
except Exception as e:
|
||
logger.error(f"数据库连接或表创建失败: {e}")
|
||
|
||
self._initialized = True
|
||
|
||
def _ensure_image_dir(self):
|
||
"""确保图像存储目录存在"""
|
||
os.makedirs(self.IMAGE_DIR, exist_ok=True)
|
||
|
||
@staticmethod
|
||
def _get_description_from_db(image_hash: str, description_type: str) -> Optional[str]:
|
||
"""从数据库获取图片描述
|
||
|
||
Args:
|
||
image_hash: 图片哈希值
|
||
description_type: 描述类型 ('emoji' 或 'image')
|
||
|
||
Returns:
|
||
Optional[str]: 描述文本,如果不存在则返回None
|
||
"""
|
||
try:
|
||
record = ImageDescriptions.get_or_none(
|
||
(ImageDescriptions.image_description_hash == image_hash) & (ImageDescriptions.type == description_type)
|
||
)
|
||
return record.description if record else None
|
||
except Exception as e:
|
||
logger.error(f"从数据库获取描述失败 (Peewee): {str(e)}")
|
||
return None
|
||
|
||
@staticmethod
|
||
def _save_description_to_db(image_hash: str, description: str, description_type: str) -> None:
|
||
"""保存图片描述到数据库
|
||
|
||
Args:
|
||
image_hash: 图片哈希值
|
||
description: 描述文本
|
||
description_type: 描述类型 ('emoji' 或 'image')
|
||
"""
|
||
try:
|
||
current_timestamp = time.time()
|
||
defaults = {"description": description, "timestamp": current_timestamp}
|
||
desc_obj, created = ImageDescriptions.get_or_create(
|
||
image_description_hash=image_hash, type=description_type, defaults=defaults
|
||
)
|
||
if not created: # 如果记录已存在,则更新
|
||
desc_obj.description = description
|
||
desc_obj.timestamp = current_timestamp
|
||
desc_obj.save()
|
||
except Exception as e:
|
||
logger.error(f"保存描述到数据库失败 (Peewee): {str(e)}")
|
||
|
||
async def get_emoji_description(self, image_base64: str) -> str:
|
||
"""获取表情包描述,带查重和保存功能"""
|
||
try:
|
||
# 计算图片哈希
|
||
# 确保base64字符串只包含ASCII字符
|
||
if isinstance(image_base64, str):
|
||
image_base64 = image_base64.encode("ascii", errors="ignore").decode("ascii")
|
||
image_bytes = base64.b64decode(image_base64)
|
||
image_hash = hashlib.md5(image_bytes).hexdigest()
|
||
image_format = Image.open(io.BytesIO(image_bytes)).format.lower()
|
||
|
||
# 查询缓存的描述
|
||
cached_description = self._get_description_from_db(image_hash, "emoji")
|
||
if cached_description:
|
||
return f"[表情包,含义看起来是:{cached_description}]"
|
||
|
||
# 调用AI获取描述
|
||
if image_format == "gif" or image_format == "GIF":
|
||
image_base64_processed = self.transform_gif(image_base64)
|
||
if image_base64_processed is None:
|
||
logger.warning("GIF转换失败,无法获取描述")
|
||
return "[表情包(GIF处理失败)]"
|
||
prompt = "这是一个动态图表情包,每一张图代表了动态图的某一帧,黑色背景代表透明,使用1-2个词描述一下表情包表达的情感和内容,简短一些,输出一段平文本,只输出1-2个词就好,不要输出其他内容"
|
||
description, _ = await self._llm.generate_response_for_image(prompt, image_base64_processed, "jpg")
|
||
else:
|
||
prompt = "图片是一个表情包,请用使用1-2个词描述一下表情包所表达的情感和内容,简短一些,输出一段平文本,只输出1-2个词就好,不要输出其他内容"
|
||
description, _ = await self._llm.generate_response_for_image(prompt, image_base64, image_format)
|
||
|
||
if description is None:
|
||
logger.warning("AI未能生成表情包描述")
|
||
return "[表情包(描述生成失败)]"
|
||
|
||
# 再次检查缓存,防止并发写入时重复生成
|
||
cached_description = self._get_description_from_db(image_hash, "emoji")
|
||
if cached_description:
|
||
logger.warning(f"虽然生成了描述,但是找到缓存表情包描述: {cached_description}")
|
||
return f"[表情包,含义看起来是:{cached_description}]"
|
||
|
||
# 根据配置决定是否保存图片
|
||
# if global_config.emoji.save_emoji:
|
||
# 生成文件名和路径
|
||
logger.debug(f"保存表情包: {image_hash}")
|
||
current_timestamp = time.time()
|
||
filename = f"{int(current_timestamp)}_{image_hash[:8]}.{image_format}"
|
||
emoji_dir = os.path.join(self.IMAGE_DIR, "emoji")
|
||
os.makedirs(emoji_dir, exist_ok=True)
|
||
file_path = os.path.join(emoji_dir, filename)
|
||
|
||
try:
|
||
# 保存文件
|
||
with open(file_path, "wb") as f:
|
||
f.write(image_bytes)
|
||
|
||
# 保存到数据库 (Images表)
|
||
try:
|
||
img_obj = Images.get((Images.emoji_hash == image_hash) & (Images.type == "emoji"))
|
||
img_obj.path = file_path
|
||
img_obj.description = description
|
||
img_obj.timestamp = current_timestamp
|
||
img_obj.save()
|
||
except Images.DoesNotExist:
|
||
Images.create(
|
||
emoji_hash=image_hash,
|
||
path=file_path,
|
||
type="emoji",
|
||
description=description,
|
||
timestamp=current_timestamp,
|
||
)
|
||
# logger.debug(f"保存表情包元数据: {file_path}")
|
||
except Exception as e:
|
||
logger.error(f"保存表情包文件或元数据失败: {str(e)}")
|
||
|
||
# 保存描述到数据库 (ImageDescriptions表)
|
||
self._save_description_to_db(image_hash, description, "emoji")
|
||
|
||
return f"[表情包:{description}]"
|
||
except Exception as e:
|
||
logger.error(f"获取表情包描述失败: {str(e)}")
|
||
return "[表情包]"
|
||
|
||
async def get_image_description(self, image_base64: str) -> str:
|
||
"""获取普通图片描述,带查重和保存功能"""
|
||
try:
|
||
# 计算图片哈希
|
||
if isinstance(image_base64, str):
|
||
image_base64 = image_base64.encode("ascii", errors="ignore").decode("ascii")
|
||
image_bytes = base64.b64decode(image_base64)
|
||
image_hash = hashlib.md5(image_bytes).hexdigest()
|
||
|
||
# 检查图片是否已存在
|
||
existing_image = Images.get_or_none(Images.emoji_hash == image_hash)
|
||
if existing_image:
|
||
# 更新计数
|
||
if hasattr(existing_image, "count") and existing_image.count is not None:
|
||
existing_image.count += 1
|
||
else:
|
||
existing_image.count = 1
|
||
existing_image.save()
|
||
|
||
# 如果已有描述,直接返回
|
||
if existing_image.description:
|
||
return f"[图片:{existing_image.description}]"
|
||
|
||
# 查询缓存的描述
|
||
cached_description = self._get_description_from_db(image_hash, "image")
|
||
if cached_description:
|
||
logger.debug(f"图片描述缓存中 {cached_description}")
|
||
return f"[图片:{cached_description}]"
|
||
|
||
# 调用AI获取描述
|
||
image_format = Image.open(io.BytesIO(image_bytes)).format.lower()
|
||
prompt = "请用中文描述这张图片的内容。如果有文字,请把文字都描述出来,请留意其主题,直观感受,输出为一段平文本,最多50字"
|
||
description, _ = await self._llm.generate_response_for_image(prompt, image_base64, image_format)
|
||
|
||
if description is None:
|
||
logger.warning("AI未能生成图片描述")
|
||
return "[图片(描述生成失败)]"
|
||
|
||
# 保存图片和描述
|
||
current_timestamp = time.time()
|
||
filename = f"{int(current_timestamp)}_{image_hash[:8]}.{image_format}"
|
||
image_dir = os.path.join(self.IMAGE_DIR, "image")
|
||
os.makedirs(image_dir, exist_ok=True)
|
||
file_path = os.path.join(image_dir, filename)
|
||
|
||
try:
|
||
# 保存文件
|
||
with open(file_path, "wb") as f:
|
||
f.write(image_bytes)
|
||
|
||
# 保存到数据库,补充缺失字段
|
||
if existing_image:
|
||
existing_image.path = file_path
|
||
existing_image.description = description
|
||
existing_image.timestamp = current_timestamp
|
||
if not hasattr(existing_image, "image_id") or not existing_image.image_id:
|
||
existing_image.image_id = str(uuid.uuid4())
|
||
if not hasattr(existing_image, "vlm_processed") or existing_image.vlm_processed is None:
|
||
existing_image.vlm_processed = True
|
||
existing_image.save()
|
||
else:
|
||
Images.create(
|
||
image_id=str(uuid.uuid4()),
|
||
emoji_hash=image_hash,
|
||
path=file_path,
|
||
type="image",
|
||
description=description,
|
||
timestamp=current_timestamp,
|
||
vlm_processed=True,
|
||
count=1,
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"保存图片文件或元数据失败: {str(e)}")
|
||
|
||
# 保存描述到ImageDescriptions表
|
||
self._save_description_to_db(image_hash, description, "image")
|
||
|
||
return f"[图片:{description}]"
|
||
except Exception as e:
|
||
logger.error(f"获取图片描述失败: {str(e)}")
|
||
return "[图片]"
|
||
|
||
@staticmethod
|
||
def transform_gif(gif_base64: str, similarity_threshold: float = 1000.0, max_frames: int = 15) -> Optional[str]:
|
||
"""将GIF转换为水平拼接的静态图像, 跳过相似的帧
|
||
|
||
Args:
|
||
gif_base64: GIF的base64编码字符串
|
||
similarity_threshold: 判定帧相似的阈值 (MSE),越小表示要求差异越大才算不同帧,默认1000.0
|
||
max_frames: 最大抽取的帧数,默认15
|
||
|
||
Returns:
|
||
Optional[str]: 拼接后的JPG图像的base64编码字符串, 或者在失败时返回None
|
||
"""
|
||
try:
|
||
# 确保base64字符串只包含ASCII字符
|
||
if isinstance(gif_base64, str):
|
||
gif_base64 = gif_base64.encode("ascii", errors="ignore").decode("ascii")
|
||
# 解码base64
|
||
gif_data = base64.b64decode(gif_base64)
|
||
gif = Image.open(io.BytesIO(gif_data))
|
||
|
||
# 收集所有帧
|
||
all_frames = []
|
||
try:
|
||
while True:
|
||
gif.seek(len(all_frames))
|
||
# 确保是RGB格式方便比较
|
||
frame = gif.convert("RGB")
|
||
all_frames.append(frame.copy())
|
||
except EOFError:
|
||
pass # 读完啦
|
||
|
||
if not all_frames:
|
||
logger.warning("GIF中没有找到任何帧")
|
||
return None # 空的GIF直接返回None
|
||
|
||
# --- 新的帧选择逻辑 ---
|
||
selected_frames = []
|
||
last_selected_frame_np = None
|
||
|
||
for i, current_frame in enumerate(all_frames):
|
||
current_frame_np = np.array(current_frame)
|
||
|
||
# 第一帧总是要选的
|
||
if i == 0:
|
||
selected_frames.append(current_frame)
|
||
last_selected_frame_np = current_frame_np
|
||
continue
|
||
|
||
# 计算和上一张选中帧的差异(均方误差 MSE)
|
||
if last_selected_frame_np is not None:
|
||
mse = np.mean((current_frame_np - last_selected_frame_np) ** 2)
|
||
# logger.debug(f"帧 {i} 与上一选中帧的 MSE: {mse}") # 可以取消注释来看差异值
|
||
|
||
# 如果差异够大,就选它!
|
||
if mse > similarity_threshold:
|
||
selected_frames.append(current_frame)
|
||
last_selected_frame_np = current_frame_np
|
||
# 检查是不是选够了
|
||
if len(selected_frames) >= max_frames:
|
||
# logger.debug(f"已选够 {max_frames} 帧,停止选择。")
|
||
break
|
||
# 如果差异不大就跳过这一帧啦
|
||
|
||
# --- 帧选择逻辑结束 ---
|
||
|
||
# 如果选择后连一帧都没有(比如GIF只有一帧且后续处理失败?)或者原始GIF就没帧,也返回None
|
||
if not selected_frames:
|
||
logger.warning("处理后没有选中任何帧")
|
||
return None
|
||
|
||
# logger.debug(f"总帧数: {len(all_frames)}, 选中帧数: {len(selected_frames)}")
|
||
|
||
# 获取选中的第一帧的尺寸(假设所有帧尺寸一致)
|
||
frame_width, frame_height = selected_frames[0].size
|
||
|
||
# 计算目标尺寸,保持宽高比
|
||
target_height = 200 # 固定高度
|
||
# 防止除以零
|
||
if frame_height == 0:
|
||
logger.error("帧高度为0,无法计算缩放尺寸")
|
||
return None
|
||
target_width = int((target_height / frame_height) * frame_width)
|
||
# 宽度也不能是0
|
||
if target_width == 0:
|
||
logger.warning(f"计算出的目标宽度为0 (原始尺寸 {frame_width}x{frame_height}),调整为1")
|
||
target_width = 1
|
||
|
||
# 调整所有选中帧的大小
|
||
resized_frames = [
|
||
frame.resize((target_width, target_height), Image.Resampling.LANCZOS) for frame in selected_frames
|
||
]
|
||
|
||
# 创建拼接图像
|
||
total_width = target_width * len(resized_frames)
|
||
# 防止总宽度为0
|
||
if total_width == 0 and len(resized_frames) > 0:
|
||
logger.warning("计算出的总宽度为0,但有选中帧,可能目标宽度太小")
|
||
# 至少给点宽度吧
|
||
total_width = len(resized_frames)
|
||
elif total_width == 0:
|
||
logger.error("计算出的总宽度为0且无选中帧")
|
||
return None
|
||
|
||
combined_image = Image.new("RGB", (total_width, target_height))
|
||
|
||
# 水平拼接图像
|
||
for idx, frame in enumerate(resized_frames):
|
||
combined_image.paste(frame, (idx * target_width, 0))
|
||
|
||
# 转换为base64
|
||
buffer = io.BytesIO()
|
||
combined_image.save(buffer, format="JPEG", quality=85) # 保存为JPEG
|
||
result_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
|
||
|
||
return result_base64
|
||
|
||
except MemoryError:
|
||
logger.error("GIF转换失败: 内存不足,可能是GIF太大或帧数太多")
|
||
return None # 内存不够啦
|
||
except Exception as e:
|
||
logger.error(f"GIF转换失败: {str(e)}", exc_info=True) # 记录详细错误信息
|
||
return None # 其他错误也返回None
|
||
|
||
async def process_image(self, image_base64: str) -> Tuple[str, str]:
|
||
"""处理图片并返回图片ID和描述
|
||
|
||
Args:
|
||
image_base64: 图片的base64编码
|
||
|
||
Returns:
|
||
Tuple[str, str]: (图片ID, 描述)
|
||
"""
|
||
try:
|
||
# 生成图片ID
|
||
# 计算图片哈希
|
||
# 确保base64字符串只包含ASCII字符
|
||
if isinstance(image_base64, str):
|
||
image_base64 = image_base64.encode("ascii", errors="ignore").decode("ascii")
|
||
image_bytes = base64.b64decode(image_base64)
|
||
image_hash = hashlib.md5(image_bytes).hexdigest()
|
||
|
||
# 检查图片是否已存在
|
||
existing_image = Images.get_or_none(Images.emoji_hash == image_hash)
|
||
|
||
if existing_image:
|
||
# 检查是否缺少必要字段,如果缺少则创建新记录
|
||
if (
|
||
not hasattr(existing_image, "image_id")
|
||
or not existing_image.image_id
|
||
or not hasattr(existing_image, "count")
|
||
or existing_image.count is None
|
||
or not hasattr(existing_image, "vlm_processed")
|
||
or existing_image.vlm_processed is None
|
||
):
|
||
logger.debug(f"图片记录缺少必要字段,补全旧记录: {image_hash}")
|
||
if not existing_image.image_id:
|
||
existing_image.image_id = str(uuid.uuid4())
|
||
if existing_image.count is None:
|
||
existing_image.count = 0
|
||
if existing_image.vlm_processed is None:
|
||
existing_image.vlm_processed = False
|
||
|
||
existing_image.count += 1
|
||
existing_image.save()
|
||
return existing_image.image_id, f"[picid:{existing_image.image_id}]"
|
||
else:
|
||
# print(f"图片已存在: {existing_image.image_id}")
|
||
# print(f"图片描述: {existing_image.description}")
|
||
# print(f"图片计数: {existing_image.count}")
|
||
# 更新计数
|
||
existing_image.count += 1
|
||
existing_image.save()
|
||
return existing_image.image_id, f"[picid:{existing_image.image_id}]"
|
||
else:
|
||
# print(f"图片不存在: {image_hash}")
|
||
image_id = str(uuid.uuid4())
|
||
|
||
# 保存新图片
|
||
current_timestamp = time.time()
|
||
image_dir = os.path.join(self.IMAGE_DIR, "images")
|
||
os.makedirs(image_dir, exist_ok=True)
|
||
filename = f"{image_id}.png"
|
||
file_path = os.path.join(image_dir, filename)
|
||
|
||
# 保存文件
|
||
with open(file_path, "wb") as f:
|
||
f.write(image_bytes)
|
||
|
||
# 保存到数据库
|
||
Images.create(
|
||
image_id=image_id,
|
||
emoji_hash=image_hash,
|
||
path=file_path,
|
||
type="image",
|
||
timestamp=current_timestamp,
|
||
vlm_processed=False,
|
||
count=1,
|
||
)
|
||
|
||
# 启动异步VLM处理
|
||
asyncio.create_task(self._process_image_with_vlm(image_id, image_base64))
|
||
|
||
return image_id, f"[picid:{image_id}]"
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理图片失败: {str(e)}")
|
||
return "", "[图片]"
|
||
|
||
async def _process_image_with_vlm(self, image_id: str, image_base64: str) -> None:
|
||
"""使用VLM处理图片并更新数据库
|
||
|
||
Args:
|
||
image_id: 图片ID
|
||
image_base64: 图片的base64编码
|
||
"""
|
||
try:
|
||
# 计算图片哈希
|
||
# 确保base64字符串只包含ASCII字符
|
||
if isinstance(image_base64, str):
|
||
image_base64 = image_base64.encode("ascii", errors="ignore").decode("ascii")
|
||
image_bytes = base64.b64decode(image_base64)
|
||
image_hash = hashlib.md5(image_bytes).hexdigest()
|
||
|
||
# 先检查缓存的描述
|
||
cached_description = self._get_description_from_db(image_hash, "image")
|
||
if cached_description:
|
||
logger.debug(f"VLM处理时发现缓存描述: {cached_description}")
|
||
# 更新数据库
|
||
image = Images.get(Images.image_id == image_id)
|
||
image.description = cached_description
|
||
image.vlm_processed = True
|
||
image.save()
|
||
return
|
||
|
||
# 获取图片格式
|
||
image_format = Image.open(io.BytesIO(image_bytes)).format.lower()
|
||
|
||
# 构建prompt
|
||
prompt = """请用中文描述这张图片的内容。如果有文字,请把文字描述概括出来,请留意其主题,直观感受,输出为一段平文本,最多30字,请注意不要分点,就输出一段文本"""
|
||
|
||
# 获取VLM描述
|
||
description, _ = await self._llm.generate_response_for_image(prompt, image_base64, image_format)
|
||
|
||
if description is None:
|
||
logger.warning("VLM未能生成图片描述")
|
||
description = "无法生成描述"
|
||
|
||
# 再次检查缓存,防止并发写入时重复生成
|
||
cached_description = self._get_description_from_db(image_hash, "image")
|
||
if cached_description:
|
||
logger.warning(f"虽然生成了描述,但是找到缓存图片描述: {cached_description}")
|
||
description = cached_description
|
||
|
||
# 更新数据库
|
||
image = Images.get(Images.image_id == image_id)
|
||
image.description = description
|
||
image.vlm_processed = True
|
||
image.save()
|
||
|
||
# 保存描述到ImageDescriptions表
|
||
self._save_description_to_db(image_hash, description, "image")
|
||
|
||
except Exception as e:
|
||
logger.error(f"VLM处理图片失败: {str(e)}")
|
||
|
||
|
||
# 创建全局单例
|
||
image_manager = None
|
||
|
||
|
||
def get_image_manager() -> ImageManager:
|
||
"""获取全局图片管理器单例"""
|
||
global image_manager
|
||
if image_manager is None:
|
||
image_manager = ImageManager()
|
||
return image_manager
|
||
|
||
|
||
def image_path_to_base64(image_path: str) -> str:
|
||
"""将图片路径转换为base64编码
|
||
Args:
|
||
image_path: 图片文件路径
|
||
Returns:
|
||
str: base64编码的图片数据
|
||
Raises:
|
||
FileNotFoundError: 当图片文件不存在时
|
||
IOError: 当读取图片文件失败时
|
||
"""
|
||
if not os.path.exists(image_path):
|
||
raise FileNotFoundError(f"图片文件不存在: {image_path}")
|
||
|
||
with open(image_path, "rb") as f:
|
||
image_data = f.read()
|
||
if not image_data:
|
||
raise IOError(f"读取图片文件失败: {image_path}")
|
||
return base64.b64encode(image_data).decode("utf-8")
|