fix: 为没有hash的表情包添加hash,修复set reply

This commit is contained in:
tcmofashi
2025-03-12 21:30:38 +08:00
parent 3857642203
commit 588aecd0f3
3 changed files with 208 additions and 253 deletions

View File

@@ -13,20 +13,22 @@ from nonebot import get_driver
from ...common.database import Database
from ..chat.config import global_config
from ..models.utils_model import LLM_request
driver = get_driver()
config = driver.config
class ImageManager:
_instance = None
IMAGE_DIR = "data" # 图像存储根目录
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.db = None
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not self._initialized:
self.db = Database.get_instance()
@@ -35,68 +37,62 @@ class ImageManager:
self._ensure_image_dir()
self._initialized = True
self._llm = LLM_request(model=global_config.vlm, temperature=0.4, max_tokens=300)
def _ensure_image_dir(self):
"""确保图像存储目录存在"""
os.makedirs(self.IMAGE_DIR, exist_ok=True)
def _ensure_image_collection(self):
"""确保images集合存在并创建索引"""
if 'images' not in self.db.list_collection_names():
self.db.create_collection('images')
if "images" not in self.db.list_collection_names():
self.db.create_collection("images")
# 创建索引
self.db.images.create_index([('hash', 1)], unique=True)
self.db.images.create_index([('url', 1)])
self.db.images.create_index([('path', 1)])
self.db.images.create_index([("hash", 1)], unique=True)
self.db.images.create_index([("url", 1)])
self.db.images.create_index([("path", 1)])
def _ensure_description_collection(self):
"""确保image_descriptions集合存在并创建索引"""
if 'image_descriptions' not in self.db.list_collection_names():
self.db.create_collection('image_descriptions')
if "image_descriptions" not in self.db.list_collection_names():
self.db.create_collection("image_descriptions")
# 创建索引
self.db.image_descriptions.create_index([('hash', 1)], unique=True)
self.db.image_descriptions.create_index([('type', 1)])
self.db.image_descriptions.create_index([("hash", 1)], unique=True)
self.db.image_descriptions.create_index([("type", 1)])
def _get_description_from_db(self, image_hash: str, description_type: str) -> Optional[str]:
"""从数据库获取图片描述
Args:
image_hash: 图片哈希值
description_type: 描述类型 ('emoji''image')
Returns:
Optional[str]: 描述文本如果不存在则返回None
"""
result= self.db.image_descriptions.find_one({
'hash': image_hash,
'type': description_type
})
return result['description'] if result else None
if image_hash is None:
return
result = self.db.image_descriptions.find_one({"hash": image_hash, "type": description_type})
return result["description"] if result else None
def _save_description_to_db(self, image_hash: str, description: str, description_type: str) -> None:
"""保存图片描述到数据库
Args:
image_hash: 图片哈希值
description: 描述文本
description_type: 描述类型 ('emoji''image')
"""
if image_hash is None:
return
self.db.image_descriptions.update_one(
{'hash': image_hash, 'type': description_type},
{
'$set': {
'description': description,
'timestamp': int(time.time())
}
},
upsert=True
{"hash": image_hash, "type": description_type},
{"$set": {"description": description, "timestamp": int(time.time())}},
upsert=True,
)
async def save_image(self,
image_data: Union[str, bytes],
url: str = None,
description: str = None,
is_base64: bool = False) -> Optional[str]:
async def save_image(
self, image_data: Union[str, bytes], url: str = None, description: str = None, is_base64: bool = False
) -> Optional[str]:
"""保存图像
Args:
image_data: 图像数据(base64字符串或字节)
@@ -118,41 +114,41 @@ class ImageManager:
image_bytes = image_data
else:
return None
# 计算哈希值
image_hash = hashlib.md5(image_bytes).hexdigest()
image_format = Image.open(io.BytesIO(image_bytes)).format.lower()
# 查重
existing = self.db.images.find_one({'hash': image_hash})
existing = self.db.images.find_one({"hash": image_hash})
if existing:
return existing['path']
return existing["path"]
# 生成文件名和路径
timestamp = int(time.time())
filename = f"{timestamp}_{image_hash[:8]}.{image_format}"
file_path = os.path.join(self.IMAGE_DIR, filename)
# 保存文件
with open(file_path, "wb") as f:
f.write(image_bytes)
# 保存到数据库
image_doc = {
'hash': image_hash,
'path': file_path,
'url': url,
'description': description,
'timestamp': timestamp
"hash": image_hash,
"path": file_path,
"url": url,
"description": description,
"timestamp": timestamp,
}
self.db.images.insert_one(image_doc)
return file_path
except Exception as e:
logger.error(f"保存图像失败: {str(e)}")
return None
async def get_image_by_url(self, url: str) -> Optional[str]:
"""根据URL获取图像路径(带查重)
Args:
@@ -162,10 +158,10 @@ class ImageManager:
"""
try:
# 先查找是否已存在
existing = self.db.images.find_one({'url': url})
existing = self.db.images.find_one({"url": url})
if existing:
return existing['path']
return existing["path"]
# 下载图像
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
@@ -173,11 +169,11 @@ class ImageManager:
image_bytes = await resp.read()
return await self.save_image(image_bytes, url=url)
return None
except Exception as e:
logger.error(f"获取图像失败: {str(e)}")
return None
async def get_base64_by_url(self, url: str) -> Optional[str]:
"""根据URL获取base64(带查重)
Args:
@@ -189,16 +185,15 @@ class ImageManager:
image_path = await self.get_image_by_url(url)
if not image_path:
return None
with open(image_path, 'rb') as f:
with open(image_path, "rb") as f:
image_bytes = f.read()
return base64.b64encode(image_bytes).decode('utf-8')
return base64.b64encode(image_bytes).decode("utf-8")
except Exception as e:
logger.error(f"获取base64失败: {str(e)}")
return None
def check_url_exists(self, url: str) -> bool:
"""检查URL是否已存在
Args:
@@ -206,8 +201,8 @@ class ImageManager:
Returns:
bool: 是否存在
"""
return self.db.images.find_one({'url': url}) is not None
return self.db.images.find_one({"url": url}) is not None
def check_hash_exists(self, image_data: Union[str, bytes], is_base64: bool = False) -> bool:
"""检查图像是否已存在
Args:
@@ -227,14 +222,14 @@ class ImageManager:
image_bytes = image_data
else:
return False
image_hash = hashlib.md5(image_bytes).hexdigest()
return self.db.images.find_one({'hash': image_hash}) is not None
return self.db.images.find_one({"hash": image_hash}) is not None
except Exception as e:
logger.error(f"检查哈希失败: {str(e)}")
return False
async def get_emoji_description(self, image_base64: str) -> str:
"""获取表情包描述,带查重和保存功能"""
try:
@@ -244,7 +239,7 @@ class ImageManager:
image_format = Image.open(io.BytesIO(image_bytes)).format.lower()
# 查询缓存的描述
cached_description = self._get_description_from_db(image_hash, 'emoji')
cached_description = self._get_description_from_db(image_hash, "emoji")
if cached_description:
logger.info(f"缓存表情包描述: {cached_description}")
return f"[表情包:{cached_description}]"
@@ -252,39 +247,35 @@ class ImageManager:
# 调用AI获取描述
prompt = "这是一个表情包,使用中文简洁的描述一下表情包的内容和表情包所表达的情感"
description, _ = await self._llm.generate_response_for_image(prompt, image_base64, image_format)
# 根据配置决定是否保存图片
if global_config.EMOJI_SAVE:
# 生成文件名和路径
timestamp = int(time.time())
filename = f"{timestamp}_{image_hash[:8]}.{image_format}"
file_path = os.path.join(self.IMAGE_DIR, 'emoji',filename)
file_path = os.path.join(self.IMAGE_DIR, "emoji", filename)
try:
# 保存文件
with open(file_path, "wb") as f:
f.write(image_bytes)
# 保存到数据库
image_doc = {
'hash': image_hash,
'path': file_path,
'type': 'emoji',
'description': description,
'timestamp': timestamp
"hash": image_hash,
"path": file_path,
"type": "emoji",
"description": description,
"timestamp": timestamp,
}
self.db.images.update_one(
{'hash': image_hash},
{'$set': image_doc},
upsert=True
)
self.db.images.update_one({"hash": image_hash}, {"$set": image_doc}, upsert=True)
logger.success(f"保存表情包: {file_path}")
except Exception as e:
logger.error(f"保存表情包文件失败: {str(e)}")
# 保存描述到数据库
self._save_description_to_db(image_hash, description, 'emoji')
self._save_description_to_db(image_hash, description, "emoji")
return f"[表情包:{description}]"
except Exception as e:
logger.error(f"获取表情包描述失败: {str(e)}")
@@ -300,60 +291,57 @@ class ImageManager:
image_format = Image.open(io.BytesIO(image_bytes)).format.lower()
# 查询缓存的描述
cached_description = self._get_description_from_db(image_hash, 'image')
cached_description = self._get_description_from_db(image_hash, "image")
if cached_description:
print("图片描述缓存中")
return f"[图片:{cached_description}]"
# 调用AI获取描述
prompt = "请用中文描述这张图片的内容。如果有文字请把文字都描述出来。并尝试猜测这个图片的含义。最多200个字。"
prompt = (
"请用中文描述这张图片的内容。如果有文字请把文字都描述出来。并尝试猜测这个图片的含义。最多200个字。"
)
description, _ = await self._llm.generate_response_for_image(prompt, image_base64, image_format)
print(f"描述是{description}")
if description is None:
logger.warning("AI未能生成图片描述")
return "[图片]"
# 根据配置决定是否保存图片
if global_config.EMOJI_SAVE:
# 生成文件名和路径
timestamp = int(time.time())
filename = f"{timestamp}_{image_hash[:8]}.{image_format}"
file_path = os.path.join(self.IMAGE_DIR,'image', filename)
file_path = os.path.join(self.IMAGE_DIR, "image", filename)
try:
# 保存文件
with open(file_path, "wb") as f:
f.write(image_bytes)
# 保存到数据库
image_doc = {
'hash': image_hash,
'path': file_path,
'type': 'image',
'description': description,
'timestamp': timestamp
"hash": image_hash,
"path": file_path,
"type": "image",
"description": description,
"timestamp": timestamp,
}
self.db.images.update_one(
{'hash': image_hash},
{'$set': image_doc},
upsert=True
)
self.db.images.update_one({"hash": image_hash}, {"$set": image_doc}, upsert=True)
logger.success(f"保存图片: {file_path}")
except Exception as e:
logger.error(f"保存图片文件失败: {str(e)}")
# 保存描述到数据库
self._save_description_to_db(image_hash, description, 'image')
self._save_description_to_db(image_hash, description, "image")
return f"[图片:{description}]"
except Exception as e:
logger.error(f"获取图片描述失败: {str(e)}")
return "[图片]"
# 创建全局单例
image_manager = ImageManager()
@@ -366,9 +354,9 @@ def image_path_to_base64(image_path: str) -> str:
str: base64编码的图片数据
"""
try:
with open(image_path, 'rb') as f:
with open(image_path, "rb") as f:
image_data = f.read()
return base64.b64encode(image_data).decode('utf-8')
return base64.b64encode(image_data).decode("utf-8")
except Exception as e:
logger.error(f"读取图片失败: {image_path}, 错误: {str(e)}")
return None
return None