diff --git a/src/chat/utils/utils_video.py b/src/chat/utils/utils_video.py index 158b8a706..2f72af32b 100644 --- a/src/chat/utils/utils_video.py +++ b/src/chat/utils/utils_video.py @@ -30,6 +30,8 @@ from src.common.logger import get_logger from src.config.config import global_config, model_config from src.llm_models.utils_model import LLMRequest from src.common.database.sqlalchemy_models import Videos, get_db_session # type: ignore +from sqlalchemy import select, update, insert # type: ignore +from sqlalchemy import exc as sa_exc # type: ignore # 简易并发控制:同一 hash 只处理一次 _video_locks: Dict[str, asyncio.Lock] = {} @@ -200,17 +202,11 @@ class VideoAnalyzer: q = prompt if prompt is not None else question video_hash = hashlib.sha256(video_bytes).hexdigest() - # 查缓存 - try: - async with get_db_session() as session: # type: ignore - row = await session.execute( - Videos.__table__.select().where(Videos.video_hash == video_hash) # type: ignore - ) - existing = row.first() - if existing and existing[Videos.description] and existing[Videos.vlm_processed]: # type: ignore - return {"summary": existing[Videos.description]} # type: ignore - except Exception: # pragma: no cover - pass + # 查缓存(第一次,未加锁) + cached = await self._get_cached(video_hash) + if cached: + logger.info(f"视频缓存命中(预检查) hash={video_hash[:16]}") + return {"summary": cached} # 获取锁避免重复处理 async with _locks_guard: @@ -219,17 +215,11 @@ class VideoAnalyzer: lock = asyncio.Lock() _video_locks[video_hash] = lock async with lock: - # 双检:进入锁后再查一次,避免重复处理 - try: - async with get_db_session() as session: # type: ignore - row = await session.execute( - Videos.__table__.select().where(Videos.video_hash == video_hash) # type: ignore - ) - existing = row.first() - if existing and existing[Videos.description] and existing[Videos.vlm_processed]: # type: ignore - return {"summary": existing[Videos.description]} # type: ignore - except Exception: # pragma: no cover - pass + # 双检缓存 + cached2 = await self._get_cached(video_hash) + if cached2: + logger.info(f"视频缓存命中(锁后) hash={video_hash[:16]}") + return {"summary": cached2} try: with tempfile.NamedTemporaryFile(delete=False) as fp: @@ -239,26 +229,7 @@ class VideoAnalyzer: ok, summary = await self.analyze_video(temp_path, q) # 写入缓存(仅成功) if ok: - try: - async with get_db_session() as session: # type: ignore - await session.execute( - Videos.__table__.insert().values( - video_id="", - video_hash=video_hash, - description=summary, - count=1, - timestamp=time.time(), - vlm_processed=True, - duration=None, - frame_count=None, - fps=None, - resolution=None, - file_size=len(video_bytes), - ) - ) - await session.commit() - except Exception: # pragma: no cover - pass + await self._save_cache(video_hash, summary, len(video_bytes)) return {"summary": summary} finally: if os.path.exists(temp_path): @@ -269,6 +240,54 @@ class VideoAnalyzer: except Exception as e: # pragma: no cover return {"summary": f"❌ 处理失败: {e}"} + # ---- 缓存辅助 ---- + async def _get_cached(self, video_hash: str) -> Optional[str]: + try: + async with get_db_session() as session: # type: ignore + result = await session.execute(select(Videos).where(Videos.video_hash == video_hash)) # type: ignore + obj: Optional[Videos] = result.scalar_one_or_none() # type: ignore + if obj and obj.vlm_processed and obj.description: + # 更新使用次数 + try: + await session.execute( + update(Videos) + .where(Videos.id == obj.id) # type: ignore + .values(count=obj.count + 1 if obj.count is not None else 1) + ) + await session.commit() + except Exception: # pragma: no cover + await session.rollback() + return obj.description + except Exception: # pragma: no cover + pass + return None + + async def _save_cache(self, video_hash: str, summary: str, file_size: int) -> None: + try: + async with get_db_session() as session: # type: ignore + stmt = insert(Videos).values( # type: ignore + video_id="", + video_hash=video_hash, + description=summary, + count=1, + timestamp=time.time(), + vlm_processed=True, + duration=None, + frame_count=None, + fps=None, + resolution=None, + file_size=file_size, + ) + try: + await session.execute(stmt) + await session.commit() + logger.debug(f"视频缓存写入 success hash={video_hash}") + except sa_exc.IntegrityError: # 可能并发已写入 + await session.rollback() + logger.debug(f"视频缓存已存在 hash={video_hash}") + except Exception: # pragma: no cover + logger.debug("视频缓存写入失败") + # ---- 外部接口 ---- _INSTANCE: Optional[VideoAnalyzer] = None