完善本地图片配图功能并优化图片上传机制

This commit is contained in:
tt-P607
2025-08-20 19:22:42 +08:00
parent 5265132cb6
commit facbb3495f
2 changed files with 202 additions and 21 deletions

View File

@@ -50,7 +50,8 @@ class MaiZoneRefactoredPlugin(BasePlugin):
"enable_ai_image": ConfigField(type=bool, default=False, description="是否启用AI生成配图"),
"enable_reply": ConfigField(type=bool, default=True, description="完成后是否回复"),
"ai_image_number": ConfigField(type=int, default=1, description="AI生成图片数量"),
"image_directory": ConfigField(type=str, default="./data/plugins/maizone_refactored/images", description="图片存储目录")
"image_number": ConfigField(type=int, default=1, description="本地配图数量1-9张"),
"image_directory": ConfigField(type=str, default="./data/images", description="图片存储目录")
},
"read": {
"permission": ConfigField(type=list, default=[], description="阅读权限QQ号列表"),

View File

@@ -9,6 +9,7 @@ import json
import os
import random
import time
import base64
from pathlib import Path
from typing import Callable, Optional, Dict, Any, List, Tuple
@@ -217,17 +218,52 @@ class QZoneService:
await api_client["like"](target_qq, fid)
def _load_local_images(self, image_dir: str) -> List[bytes]:
"""随机加载本地图片(不删除文件)"""
images = []
if not os.path.exists(image_dir):
logger.warning(f"图片目录不存在: {image_dir}")
return images
try:
files = sorted([f for f in os.listdir(image_dir) if os.path.isfile(os.path.join(image_dir, f))])
for filename in files:
# 获取所有图片文件
all_files = [f for f in os.listdir(image_dir)
if os.path.isfile(os.path.join(image_dir, f))
and f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp'))]
if not all_files:
logger.warning(f"图片目录中没有找到图片文件: {image_dir}")
return images
# 检查是否启用配图
enable_image = bool(self.get_config("send.enable_image", False))
if not enable_image:
logger.info("说说配图功能已关闭")
return images
# 根据配置选择图片数量
config_image_number = self.get_config("send.image_number", 1)
try:
config_image_number = int(config_image_number)
except (ValueError, TypeError):
config_image_number = 1
logger.warning(f"配置项 image_number 值无效,使用默认值 1")
max_images = min(min(config_image_number, 9), len(all_files)) # 最多9张最少1张
selected_count = max(1, max_images) # 确保至少选择1张
selected_files = random.sample(all_files, selected_count)
logger.info(f"{len(all_files)} 张图片中随机选择了 {selected_count} 张配图")
for filename in selected_files:
full_path = os.path.join(image_dir, filename)
with open(full_path, "rb") as f:
images.append(f.read())
os.remove(full_path)
try:
with open(full_path, "rb") as f:
image_data = f.read()
images.append(image_data)
logger.info(f"加载图片: {filename} ({len(image_data)} bytes)")
except Exception as e:
logger.error(f"加载图片 {filename} 失败: {e}")
return images
except Exception as e:
logger.error(f"加载本地图片失败: {e}")
@@ -353,31 +389,175 @@ class QZoneService:
"format": "json",
"qzreferrer": f"https://user.qzone.qq.com/{uin}",
}
# 处理图片上传
if images:
pic_bos, richvals = [], [] # noqa: F841
# The original logic for uploading images is complex and involves multiple steps.
# This simplified version captures the essence. A full implementation would require
# a separate, robust image upload function.
for _img_bytes in images:
# This is a placeholder for the actual image upload logic which is quite complex.
# In a real scenario, you would call a dedicated `_upload_image` method here.
# For now, we assume the upload is successful and we get back dummy data.
pass # Simplified for this example
# Dummy data for illustration
if images:
post_data["pic_bo"] = "dummy_pic_bo"
post_data["richtype"] = "1"
post_data["richval"] = "dummy_rich_val"
logger.info(f"开始上传 {len(images)} 张图片...")
pic_bos = []
richvals = []
for i, img_bytes in enumerate(images):
try:
# 上传图片到QQ空间
upload_result = await _upload_image(img_bytes, i)
if upload_result:
pic_bos.append(upload_result["pic_bo"])
richvals.append(upload_result["richval"])
logger.info(f"图片 {i+1} 上传成功")
else:
logger.error(f"图片 {i+1} 上传失败")
except Exception as e:
logger.error(f"上传图片 {i+1} 时发生异常: {e}")
if pic_bos and richvals:
# 完全按照原版格式设置图片参数
post_data['pic_bo'] = ','.join(pic_bos)
post_data['richtype'] = '1'
post_data['richval'] = '\t'.join(richvals) # 原版使用制表符分隔
logger.info(f"准备发布带图说说: {len(pic_bos)} 张图片")
logger.info(f"pic_bo参数: {post_data['pic_bo']}")
logger.info(f"richval参数长度: {len(post_data['richval'])} 字符")
else:
logger.warning("所有图片上传失败,将发布纯文本说说")
res_text = await _request("POST", self.EMOTION_PUBLISH_URL, params={"g_tk": gtk}, data=post_data)
result = json.loads(res_text)
tid = result.get("tid", "")
if tid:
if images and pic_bos:
logger.info(f"成功发布带图说说tid: {tid},包含 {len(pic_bos)} 张图片")
else:
logger.info(f"成功发布文本说说tid: {tid}")
else:
logger.error(f"发布说说失败API返回: {result}")
return bool(tid), tid
except Exception as e:
logger.error(f"发布说说异常: {e}", exc_info=True)
return False, ""
def _image_to_base64(image_bytes: bytes) -> str:
"""将图片字节转换为base64字符串仿照原版实现"""
pic_base64 = base64.b64encode(image_bytes)
return str(pic_base64)[2:-1] # 去掉 b'...' 的前缀和后缀
def _get_picbo_and_richval(upload_result: dict) -> tuple:
"""从上传结果中提取图片的picbo和richval值仿照原版实现"""
json_data = upload_result
if 'ret' not in json_data:
raise Exception("获取图片picbo和richval失败")
if json_data['ret'] != 0:
raise Exception("上传图片失败")
# 从URL中提取bo参数
picbo_spt = json_data['data']['url'].split('&bo=')
if len(picbo_spt) < 2:
raise Exception("上传图片失败")
picbo = picbo_spt[1]
# 构造richval - 完全按照原版格式
richval = ",{},{},{},{},{},{},,{},{}".format(
json_data['data']['albumid'],
json_data['data']['lloc'],
json_data['data']['sloc'],
json_data['data']['type'],
json_data['data']['height'],
json_data['data']['width'],
json_data['data']['height'],
json_data['data']['width']
)
return picbo, richval
async def _upload_image(image_bytes: bytes, index: int) -> Optional[Dict[str, str]]:
"""上传图片到QQ空间完全按照原版实现"""
try:
upload_url = "https://up.qzone.qq.com/cgi-bin/upload/cgi_upload_image"
# 完全按照原版构建请求数据
post_data = {
"filename": "filename",
"zzpanelkey": "",
"uploadtype": "1",
"albumtype": "7",
"exttype": "0",
"skey": cookies.get("skey", ""),
"zzpaneluin": uin,
"p_uin": uin,
"uin": uin,
"p_skey": cookies.get('p_skey', ''),
"output_type": "json",
"qzonetoken": "",
"refer": "shuoshuo",
"charset": "utf-8",
"output_charset": "utf-8",
"upload_hd": "1",
"hd_width": "2048",
"hd_height": "10000",
"hd_quality": "96",
"backUrls": "http://upbak.photo.qzone.qq.com/cgi-bin/upload/cgi_upload_image,"
"http://119.147.64.75/cgi-bin/upload/cgi_upload_image",
"url": f"https://up.qzone.qq.com/cgi-bin/upload/cgi_upload_image?g_tk={gtk}",
"base64": "1",
"picfile": _image_to_base64(image_bytes),
}
headers = {
'referer': f'https://user.qzone.qq.com/{uin}',
'origin': 'https://user.qzone.qq.com'
}
logger.info(f"开始上传图片 {index+1}...")
async with aiohttp.ClientSession(cookies=cookies) as session:
timeout = aiohttp.ClientTimeout(total=60)
async with session.post(
upload_url,
data=post_data,
headers=headers,
timeout=timeout
) as response:
if response.status == 200:
resp_text = await response.text()
logger.info(f"图片上传响应状态码: {response.status}")
logger.info(f"图片上传响应内容前500字符: {resp_text[:500]}")
# 按照原版方式解析响应
start_idx = resp_text.find('{')
end_idx = resp_text.rfind('}') + 1
if start_idx != -1 and end_idx != -1:
json_str = resp_text[start_idx:end_idx]
upload_result = eval(json_str) # 与原版保持一致使用eval
logger.info(f"图片上传解析结果: {upload_result}")
if upload_result.get('ret') == 0:
# 使用原版的参数提取逻辑
picbo, richval = _get_picbo_and_richval(upload_result)
logger.info(f"图片 {index+1} 上传成功: picbo={picbo}")
return {
"pic_bo": picbo,
"richval": richval
}
else:
logger.error(f"图片 {index+1} 上传失败: {upload_result}")
return None
else:
logger.error("无法解析上传响应")
return None
else:
error_text = await response.text()
logger.error(f"图片上传HTTP请求失败状态码: {response.status}, 响应: {error_text[:200]}")
return None
except Exception as e:
logger.error(f"上传图片 {index+1} 异常: {e}", exc_info=True)
return None
async def _list_feeds(t_qq: str, num: int) -> List[Dict]:
"""获取指定用户说说列表"""
try: