Update utils_video.py
This commit is contained in:
@@ -30,6 +30,8 @@ from src.common.logger import get_logger
|
||||
from src.config.config import global_config, model_config
|
||||
from src.llm_models.utils_model import LLMRequest
|
||||
from src.common.database.sqlalchemy_models import Videos, get_db_session # type: ignore
|
||||
from sqlalchemy import select, update, insert # type: ignore
|
||||
from sqlalchemy import exc as sa_exc # type: ignore
|
||||
|
||||
# 简易并发控制:同一 hash 只处理一次
|
||||
_video_locks: Dict[str, asyncio.Lock] = {}
|
||||
@@ -200,17 +202,11 @@ class VideoAnalyzer:
|
||||
q = prompt if prompt is not None else question
|
||||
video_hash = hashlib.sha256(video_bytes).hexdigest()
|
||||
|
||||
# 查缓存
|
||||
try:
|
||||
async with get_db_session() as session: # type: ignore
|
||||
row = await session.execute(
|
||||
Videos.__table__.select().where(Videos.video_hash == video_hash) # type: ignore
|
||||
)
|
||||
existing = row.first()
|
||||
if existing and existing[Videos.description] and existing[Videos.vlm_processed]: # type: ignore
|
||||
return {"summary": existing[Videos.description]} # type: ignore
|
||||
except Exception: # pragma: no cover
|
||||
pass
|
||||
# 查缓存(第一次,未加锁)
|
||||
cached = await self._get_cached(video_hash)
|
||||
if cached:
|
||||
logger.info(f"视频缓存命中(预检查) hash={video_hash[:16]}")
|
||||
return {"summary": cached}
|
||||
|
||||
# 获取锁避免重复处理
|
||||
async with _locks_guard:
|
||||
@@ -219,17 +215,11 @@ class VideoAnalyzer:
|
||||
lock = asyncio.Lock()
|
||||
_video_locks[video_hash] = lock
|
||||
async with lock:
|
||||
# 双检:进入锁后再查一次,避免重复处理
|
||||
try:
|
||||
async with get_db_session() as session: # type: ignore
|
||||
row = await session.execute(
|
||||
Videos.__table__.select().where(Videos.video_hash == video_hash) # type: ignore
|
||||
)
|
||||
existing = row.first()
|
||||
if existing and existing[Videos.description] and existing[Videos.vlm_processed]: # type: ignore
|
||||
return {"summary": existing[Videos.description]} # type: ignore
|
||||
except Exception: # pragma: no cover
|
||||
pass
|
||||
# 双检缓存
|
||||
cached2 = await self._get_cached(video_hash)
|
||||
if cached2:
|
||||
logger.info(f"视频缓存命中(锁后) hash={video_hash[:16]}")
|
||||
return {"summary": cached2}
|
||||
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(delete=False) as fp:
|
||||
@@ -239,26 +229,7 @@ class VideoAnalyzer:
|
||||
ok, summary = await self.analyze_video(temp_path, q)
|
||||
# 写入缓存(仅成功)
|
||||
if ok:
|
||||
try:
|
||||
async with get_db_session() as session: # type: ignore
|
||||
await session.execute(
|
||||
Videos.__table__.insert().values(
|
||||
video_id="",
|
||||
video_hash=video_hash,
|
||||
description=summary,
|
||||
count=1,
|
||||
timestamp=time.time(),
|
||||
vlm_processed=True,
|
||||
duration=None,
|
||||
frame_count=None,
|
||||
fps=None,
|
||||
resolution=None,
|
||||
file_size=len(video_bytes),
|
||||
)
|
||||
)
|
||||
await session.commit()
|
||||
except Exception: # pragma: no cover
|
||||
pass
|
||||
await self._save_cache(video_hash, summary, len(video_bytes))
|
||||
return {"summary": summary}
|
||||
finally:
|
||||
if os.path.exists(temp_path):
|
||||
@@ -269,6 +240,54 @@ class VideoAnalyzer:
|
||||
except Exception as e: # pragma: no cover
|
||||
return {"summary": f"❌ 处理失败: {e}"}
|
||||
|
||||
# ---- 缓存辅助 ----
|
||||
async def _get_cached(self, video_hash: str) -> Optional[str]:
|
||||
try:
|
||||
async with get_db_session() as session: # type: ignore
|
||||
result = await session.execute(select(Videos).where(Videos.video_hash == video_hash)) # type: ignore
|
||||
obj: Optional[Videos] = result.scalar_one_or_none() # type: ignore
|
||||
if obj and obj.vlm_processed and obj.description:
|
||||
# 更新使用次数
|
||||
try:
|
||||
await session.execute(
|
||||
update(Videos)
|
||||
.where(Videos.id == obj.id) # type: ignore
|
||||
.values(count=obj.count + 1 if obj.count is not None else 1)
|
||||
)
|
||||
await session.commit()
|
||||
except Exception: # pragma: no cover
|
||||
await session.rollback()
|
||||
return obj.description
|
||||
except Exception: # pragma: no cover
|
||||
pass
|
||||
return None
|
||||
|
||||
async def _save_cache(self, video_hash: str, summary: str, file_size: int) -> None:
|
||||
try:
|
||||
async with get_db_session() as session: # type: ignore
|
||||
stmt = insert(Videos).values( # type: ignore
|
||||
video_id="",
|
||||
video_hash=video_hash,
|
||||
description=summary,
|
||||
count=1,
|
||||
timestamp=time.time(),
|
||||
vlm_processed=True,
|
||||
duration=None,
|
||||
frame_count=None,
|
||||
fps=None,
|
||||
resolution=None,
|
||||
file_size=file_size,
|
||||
)
|
||||
try:
|
||||
await session.execute(stmt)
|
||||
await session.commit()
|
||||
logger.debug(f"视频缓存写入 success hash={video_hash}")
|
||||
except sa_exc.IntegrityError: # 可能并发已写入
|
||||
await session.rollback()
|
||||
logger.debug(f"视频缓存已存在 hash={video_hash}")
|
||||
except Exception: # pragma: no cover
|
||||
logger.debug("视频缓存写入失败")
|
||||
|
||||
|
||||
# ---- 外部接口 ----
|
||||
_INSTANCE: Optional[VideoAnalyzer] = None
|
||||
|
||||
Reference in New Issue
Block a user