refactor(maizone_refactored): 提取Cookie管理到独立的CookieService

将原先在QZoneService中实现的Cookie更新与加载逻辑,抽象并重构为一个独立的CookieService。这提高了代码的模块化和可复用性,使得Cookie管理逻辑更加清晰和集中。

- 新增 `CookieService` 用于统一处理Cookie的获取、更新和本地缓存。
- 在主插件中注入 `CookieService` 到 `QZoneService`。
- `QZoneService` 现在依赖 `CookieService` 来获取Cookie,移除了内部的实现细节。
- 新增了备用Cookie获取服务的相关配置项。

Co-authored-by: TT-P607
<TT-P607@users.noreply.github.com>
This commit is contained in:
minecraft1024a
2025-08-17 12:34:19 +08:00
parent 55c9dbcb16
commit d00251352f
3 changed files with 132 additions and 34 deletions

View File

@@ -23,6 +23,7 @@ from .services.image_service import ImageService
from .services.qzone_service import QZoneService
from .services.scheduler_service import SchedulerService
from .services.monitor_service import MonitorService
from .services.cookie_service import CookieService
from .services.manager import register_service
logger = get_logger("MaiZone.Plugin")
@@ -68,6 +69,10 @@ class MaiZoneRefactoredPlugin(BasePlugin):
"schedule": {
"enable_schedule": ConfigField(type=bool, default=False, description="是否启用定时发送"),
},
"cookie": {
"http_fallback_host": ConfigField(type=str, default="127.0.0.1", description="备用Cookie获取服务的主机地址"),
"http_fallback_port": ConfigField(type=int, default=8080, description="备用Cookie获取服务的端口"),
},
}
def __init__(self, *args, **kwargs):
@@ -75,7 +80,8 @@ class MaiZoneRefactoredPlugin(BasePlugin):
content_service = ContentService(self.get_config)
image_service = ImageService(self.get_config)
qzone_service = QZoneService(self.get_config, content_service, image_service)
cookie_service = CookieService(self.get_config)
qzone_service = QZoneService(self.get_config, content_service, image_service, cookie_service)
scheduler_service = SchedulerService(self.get_config, qzone_service)
monitor_service = MonitorService(self.get_config, qzone_service)

View File

@@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
"""
Cookie服务模块
负责从多种来源获取、缓存和管理QZone的Cookie。
"""
import json
from pathlib import Path
from typing import Callable, Optional, Dict
import aiohttp
from src.common.logger import get_logger
from src.plugin_system.apis import send_api
logger = get_logger("MaiZone.CookieService")
class CookieService:
"""
管理Cookie的获取和缓存支持多种获取策略。
"""
def __init__(self, get_config: Callable):
self.get_config = get_config
self.cookie_dir = Path(__file__).resolve().parent.parent / "cookies"
self.cookie_dir.mkdir(exist_ok=True)
def _get_cookie_file_path(self, qq_account: str) -> Path:
"""获取指定QQ账号的cookie文件路径"""
return self.cookie_dir / f"cookies-{qq_account}.json"
def _save_cookies_to_file(self, qq_account: str, cookies: Dict[str, str]):
"""将Cookie保存到本地文件"""
cookie_file_path = self._get_cookie_file_path(qq_account)
try:
with open(cookie_file_path, "w", encoding="utf-8") as f:
json.dump(cookies, f)
logger.info(f"Cookie已成功缓存至: {cookie_file_path}")
except IOError as e:
logger.error(f"无法写入Cookie文件 {cookie_file_path}: {e}")
def _load_cookies_from_file(self, qq_account: str) -> Optional[Dict[str, str]]:
"""从本地文件加载Cookie"""
cookie_file_path = self._get_cookie_file_path(qq_account)
if cookie_file_path.exists():
try:
with open(cookie_file_path, "r", encoding="utf-8") as f:
return json.load(f)
except (IOError, json.JSONDecodeError) as e:
logger.error(f"无法读取或解析Cookie文件 {cookie_file_path}: {e}")
return None
async def _get_cookies_from_adapter(self, stream_id: Optional[str]) -> Optional[Dict[str, str]]:
"""通过Adapter API获取Cookie"""
try:
params = {"domain": "user.qzone.qq.com"}
if stream_id:
response = await send_api.adapter_command_to_stream(action="get_cookies", params=params, platform="qq", stream_id=stream_id, timeout=40.0)
else:
response = await send_api.adapter_command_to_stream(action="get_cookies", params=params, platform="qq", timeout=40.0)
if response.get("status") == "ok":
cookie_str = response.get("data", {}).get("cookies", "")
if cookie_str:
return {k.strip(): v.strip() for k, v in (p.split('=', 1) for p in cookie_str.split('; ') if '=' in p)}
except Exception as e:
logger.error(f"通过Adapter获取Cookie时发生异常: {e}")
return None
async def _get_cookies_from_http(self) -> Optional[Dict[str, str]]:
"""通过备用HTTP端点获取Cookie"""
host = self.get_config("cookie.http_fallback_host")
port = self.get_config("cookie.http_fallback_port")
if not host or not port:
return None
http_url = f"http://{host}:{port}/get_cookies"
try:
timeout = aiohttp.ClientTimeout(total=15)
async with aiohttp.ClientSession() as session:
async with session.get(http_url, timeout=timeout) as response:
response.raise_for_status()
# 假设API直接返回JSON格式的cookie
return await response.json()
except Exception as e:
logger.error(f"通过HTTP备用地址 {http_url} 获取Cookie失败: {e}")
return None
async def get_cookies(self, qq_account: str, stream_id: Optional[str]) -> Optional[Dict[str, str]]:
"""
获取Cookie按以下顺序尝试
1. Adapter API
2. HTTP备用端点
3. 本地文件缓存
"""
# 1. 尝试从Adapter获取
cookies = await self._get_cookies_from_adapter(stream_id)
if cookies:
logger.info("成功从Adapter获取Cookie。")
self._save_cookies_to_file(qq_account, cookies)
return cookies
# 2. 尝试从HTTP备用端点获取
logger.warning("从Adapter获取Cookie失败尝试使用HTTP备用地址。")
cookies = await self._get_cookies_from_http()
if cookies:
logger.info("成功从HTTP备用地址获取Cookie。")
self._save_cookies_to_file(qq_account, cookies)
return cookies
# 3. 尝试从本地文件加载
logger.warning("从HTTP备用地址获取Cookie失败尝试加载本地缓存。")
cookies = self._load_cookies_from_file(qq_account)
if cookies:
logger.info("成功从本地文件加载缓存的Cookie。")
return cookies
logger.error("所有Cookie获取方法均失败。")
return None

View File

@@ -9,7 +9,6 @@ import json
import os
import random
import time
from pathlib import Path
from typing import Callable, Optional, Dict, Any, List, Tuple
import aiohttp
@@ -17,10 +16,11 @@ import bs4
import json5
from src.chat.utils.utils_image import get_image_manager
from src.common.logger import get_logger
from src.plugin_system.apis import send_api, config_api, person_api
from src.plugin_system.apis import config_api, person_api
from .content_service import ContentService
from .image_service import ImageService
from .cookie_service import CookieService
logger = get_logger("MaiZone.QZoneService")
@@ -38,10 +38,11 @@ class QZoneService:
REPLY_URL = "https://user.qzone.qq.com/proxy/domain/taotao.qzone.qq.com/cgi-bin/emotion_cgi_re_feeds"
def __init__(self, get_config: Callable, content_service: ContentService, image_service: ImageService):
def __init__(self, get_config: Callable, content_service: ContentService, image_service: ImageService, cookie_service: CookieService):
self.get_config = get_config
self.content_service = content_service
self.image_service = image_service
self.cookie_service = cookie_service
# --- Public Methods (High-Level Business Logic) ---
@@ -225,37 +226,8 @@ class QZoneService:
hash_val += (hash_val << 5) + ord(char)
return str(hash_val & 2147483647)
async def _renew_and_load_cookies(self, qq_account: str, stream_id: Optional[str]) -> Optional[Dict[str, str]]:
cookie_dir = Path(__file__).resolve().parent.parent / "cookies"
cookie_dir.mkdir(exist_ok=True)
cookie_file_path = cookie_dir / f"cookies-{qq_account}.json"
try:
params = {"domain": "user.qzone.qq.com"}
if stream_id:
response = await send_api.adapter_command_to_stream(action="get_cookies", params=params, platform="qq", stream_id=stream_id, timeout=40.0)
else:
response = await send_api.adapter_command_to_stream(action="get_cookies", params=params, platform="qq", timeout=40.0)
if response.get("status") == "ok":
cookie_str = response.get("data", {}).get("cookies", "")
if cookie_str:
parsed_cookies = {k.strip(): v.strip() for k, v in (p.split('=', 1) for p in cookie_str.split('; ') if '=' in p)}
with open(cookie_file_path, "w", encoding="utf-8") as f:
json.dump(parsed_cookies, f)
logger.info(f"Cookie已更新并保存至: {cookie_file_path}")
return parsed_cookies
if cookie_file_path.exists():
with open(cookie_file_path, "r", encoding="utf-8") as f:
return json.load(f)
return None
except Exception as e:
logger.error(f"更新或加载Cookie时发生异常: {e}")
return None
async def _get_api_client(self, qq_account: str, stream_id: Optional[str]) -> Optional[Dict]:
cookies = await self._renew_and_load_cookies(qq_account, stream_id)
cookies = await self.cookie_service.get_cookies(qq_account, stream_id)
if not cookies: return None
p_skey = cookies.get('p_skey') or cookies.get('p_skey'.upper())