Remove rust-video keyframe extraction API and related files
Deleted the entire src/chat/utils/rust-video directory, including Rust and Python source files, configuration, and documentation. Updated utils_video.py, official_configs.py, and bot_config_template.toml to remove or adjust references to the removed rust-video module. This cleans up the codebase by removing the integrated Rust-based keyframe extraction API and its supporting infrastructure.
This commit is contained in:
@@ -1,12 +1,12 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
视频分析器模块 - 优化版本
|
||||
支持多种分析模式:批处理、逐帧、自动选择
|
||||
视频分析器模块 - Rust优化版本
|
||||
集成了Rust视频关键帧提取模块,提供高性能的视频分析功能
|
||||
支持SIMD优化、多线程处理和智能关键帧检测
|
||||
"""
|
||||
|
||||
import os
|
||||
import cv2
|
||||
import tempfile
|
||||
import asyncio
|
||||
import base64
|
||||
@@ -17,7 +17,6 @@ from PIL import Image
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple, Optional, Dict
|
||||
import io
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from src.llm_models.utils_model import LLMRequest
|
||||
from src.config.config import global_config, model_config
|
||||
@@ -26,6 +25,11 @@ from src.common.database.sqlalchemy_models import get_db_session, Videos
|
||||
|
||||
logger = get_logger("utils_video")
|
||||
|
||||
# 导入 Rust 视频处理模块
|
||||
import rust_video
|
||||
|
||||
logger.info("✅ Rust 视频处理模块加载成功")
|
||||
|
||||
# 全局正在处理的视频哈希集合,用于防止重复处理
|
||||
processing_videos = set()
|
||||
processing_lock = asyncio.Lock()
|
||||
@@ -35,110 +39,6 @@ video_events = {}
|
||||
video_lock_manager = asyncio.Lock()
|
||||
|
||||
|
||||
def _extract_frames_worker(video_path: str,
|
||||
max_frames: int,
|
||||
frame_quality: int,
|
||||
max_image_size: int,
|
||||
frame_extraction_mode: str,
|
||||
frame_interval_seconds: Optional[float]) -> List[Tuple[str, float]]:
|
||||
"""线程池中提取视频帧的工作函数"""
|
||||
frames = []
|
||||
try:
|
||||
cap = cv2.VideoCapture(video_path)
|
||||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
duration = total_frames / fps if fps > 0 else 0
|
||||
|
||||
if frame_extraction_mode == "time_interval":
|
||||
# 新模式:按时间间隔抽帧
|
||||
time_interval = frame_interval_seconds
|
||||
next_frame_time = 0.0
|
||||
extracted_count = 0 # 初始化提取帧计数器
|
||||
|
||||
while cap.isOpened():
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
|
||||
current_time = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000.0
|
||||
|
||||
if current_time >= next_frame_time:
|
||||
# 转换为PIL图像并压缩
|
||||
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
pil_image = Image.fromarray(frame_rgb)
|
||||
|
||||
# 调整图像大小
|
||||
if max(pil_image.size) > max_image_size:
|
||||
ratio = max_image_size / max(pil_image.size)
|
||||
new_size = tuple(int(dim * ratio) for dim in pil_image.size)
|
||||
pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS)
|
||||
|
||||
# 转换为base64
|
||||
buffer = io.BytesIO()
|
||||
pil_image.save(buffer, format='JPEG', quality=frame_quality)
|
||||
frame_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
|
||||
|
||||
frames.append((frame_base64, current_time))
|
||||
extracted_count += 1
|
||||
|
||||
# 注意:这里不能使用logger,因为在线程池中
|
||||
# logger.debug(f"提取第{extracted_count}帧 (时间: {current_time:.2f}s)")
|
||||
|
||||
next_frame_time += time_interval
|
||||
else:
|
||||
# 使用numpy优化帧间隔计算
|
||||
if duration > 0:
|
||||
frame_interval = max(1, int(duration / max_frames * fps))
|
||||
else:
|
||||
frame_interval = 30 # 默认间隔
|
||||
|
||||
# 使用numpy计算目标帧位置
|
||||
target_frames = np.arange(0, min(max_frames, total_frames // frame_interval + 1)) * frame_interval
|
||||
target_frames = target_frames[target_frames < total_frames].astype(int)
|
||||
|
||||
for target_frame in target_frames:
|
||||
# 跳转到目标帧
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
continue
|
||||
|
||||
# 使用numpy优化图像处理
|
||||
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
|
||||
# 转换为PIL图像并使用numpy进行尺寸计算
|
||||
height, width = frame_rgb.shape[:2]
|
||||
max_dim = max(height, width)
|
||||
|
||||
if max_dim > max_image_size:
|
||||
# 使用numpy计算缩放比例
|
||||
ratio = max_image_size / max_dim
|
||||
new_width = int(width * ratio)
|
||||
new_height = int(height * ratio)
|
||||
|
||||
# 使用opencv进行高效缩放
|
||||
frame_resized = cv2.resize(frame_rgb, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4)
|
||||
pil_image = Image.fromarray(frame_resized)
|
||||
else:
|
||||
pil_image = Image.fromarray(frame_rgb)
|
||||
|
||||
# 转换为base64
|
||||
buffer = io.BytesIO()
|
||||
pil_image.save(buffer, format='JPEG', quality=frame_quality)
|
||||
frame_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
|
||||
|
||||
# 计算时间戳
|
||||
timestamp = target_frame / fps if fps > 0 else 0
|
||||
frames.append((frame_base64, timestamp))
|
||||
|
||||
cap.release()
|
||||
return frames
|
||||
|
||||
except Exception as e:
|
||||
# 返回错误信息
|
||||
return [("ERROR", str(e))]
|
||||
|
||||
|
||||
class VideoAnalyzer:
|
||||
"""优化的视频分析器类"""
|
||||
|
||||
@@ -168,6 +68,13 @@ class VideoAnalyzer:
|
||||
self.max_image_size = getattr(config, 'max_image_size', 600)
|
||||
self.enable_frame_timing = getattr(config, 'enable_frame_timing', True)
|
||||
|
||||
# Rust模块相关配置
|
||||
self.rust_keyframe_threshold = getattr(config, 'rust_keyframe_threshold', 2.0)
|
||||
self.rust_use_simd = getattr(config, 'rust_use_simd', True)
|
||||
self.rust_block_size = getattr(config, 'rust_block_size', 8192)
|
||||
self.rust_threads = getattr(config, 'rust_threads', 0)
|
||||
self.ffmpeg_path = getattr(config, 'ffmpeg_path', 'ffmpeg')
|
||||
|
||||
# 从personality配置中获取人格信息
|
||||
try:
|
||||
personality_config = global_config.personality
|
||||
@@ -225,6 +132,34 @@ class VideoAnalyzer:
|
||||
self.system_prompt = "你是一个专业的视频内容分析助手。请仔细观察用户提供的视频关键帧,详细描述视频内容。"
|
||||
|
||||
logger.info(f"✅ 视频分析器初始化完成,分析模式: {self.analysis_mode}, 线程池: {self.use_multiprocessing}")
|
||||
|
||||
# 获取Rust模块系统信息
|
||||
self._log_system_info()
|
||||
|
||||
def _log_system_info(self):
|
||||
"""记录系统信息"""
|
||||
try:
|
||||
system_info = rust_video.get_system_info()
|
||||
logger.info(f"🔧 系统信息: 线程数={system_info.get('threads', '未知')}")
|
||||
|
||||
# 记录CPU特性
|
||||
features = []
|
||||
if system_info.get('avx2_supported'):
|
||||
features.append('AVX2')
|
||||
if system_info.get('sse2_supported'):
|
||||
features.append('SSE2')
|
||||
if system_info.get('simd_supported'):
|
||||
features.append('SIMD')
|
||||
|
||||
if features:
|
||||
logger.info(f"🚀 CPU特性: {', '.join(features)}")
|
||||
else:
|
||||
logger.info("⚠️ 未检测到SIMD支持")
|
||||
|
||||
logger.info(f"📦 Rust模块版本: {system_info.get('version', '未知')}")
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"获取系统信息失败: {e}")
|
||||
|
||||
def _calculate_video_hash(self, video_data: bytes) -> str:
|
||||
"""计算视频文件的hash值"""
|
||||
@@ -245,6 +180,11 @@ class VideoAnalyzer:
|
||||
|
||||
def _store_video_result(self, video_hash: str, description: str, metadata: Optional[Dict] = None) -> Optional[Videos]:
|
||||
"""存储视频分析结果到数据库"""
|
||||
# 检查描述是否为错误信息,如果是则不保存
|
||||
if description.startswith("❌"):
|
||||
logger.warning(f"⚠️ 检测到错误信息,不保存到数据库: {description[:50]}...")
|
||||
return None
|
||||
|
||||
try:
|
||||
with get_db_session() as session:
|
||||
# 只根据video_hash查找
|
||||
@@ -299,171 +239,169 @@ class VideoAnalyzer:
|
||||
logger.warning(f"无效的分析模式: {mode}")
|
||||
|
||||
async def extract_frames(self, video_path: str) -> List[Tuple[str, float]]:
|
||||
"""提取视频帧 - 支持多进程和单线程模式"""
|
||||
# 先获取视频信息
|
||||
cap = cv2.VideoCapture(video_path)
|
||||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
duration = total_frames / fps if fps > 0 else 0
|
||||
cap.release()
|
||||
|
||||
logger.info(f"视频信息: {total_frames}帧, {fps:.2f}FPS, {duration:.2f}秒")
|
||||
|
||||
# 估算提取帧数
|
||||
if duration > 0:
|
||||
frame_interval = max(1, int(duration / self.max_frames * fps))
|
||||
estimated_frames = min(self.max_frames, total_frames // frame_interval + 1)
|
||||
else:
|
||||
estimated_frames = self.max_frames
|
||||
|
||||
logger.info(f"计算得出帧间隔: {frame_interval} (将提取约{estimated_frames}帧)")
|
||||
|
||||
# 根据配置选择处理方式
|
||||
if self.use_multiprocessing:
|
||||
return await self._extract_frames_multiprocess(video_path)
|
||||
else:
|
||||
return await self._extract_frames_fallback(video_path)
|
||||
|
||||
async def _extract_frames_multiprocess(self, video_path: str) -> List[Tuple[str, float]]:
|
||||
"""线程池版本的帧提取"""
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
"""提取视频帧 - 使用 Rust 实现"""
|
||||
# 优先尝试高级接口,失败时回退到基础接口
|
||||
try:
|
||||
logger.info("🔄 启动线程池帧提取...")
|
||||
# 使用线程池,避免进程间的导入问题
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
frames = await loop.run_in_executor(
|
||||
executor,
|
||||
_extract_frames_worker,
|
||||
video_path,
|
||||
self.max_frames,
|
||||
self.frame_quality,
|
||||
self.max_image_size,
|
||||
self.frame_extraction_mode,
|
||||
self.frame_interval_seconds
|
||||
)
|
||||
return await self._extract_frames_rust_advanced(video_path)
|
||||
except Exception as e:
|
||||
logger.warning(f"高级接口失败: {e},使用基础接口")
|
||||
return await self._extract_frames_rust(video_path)
|
||||
|
||||
async def _extract_frames_rust_advanced(self, video_path: str) -> List[Tuple[str, float]]:
|
||||
"""使用 Rust 高级接口的帧提取"""
|
||||
try:
|
||||
logger.info("🔄 使用 Rust 高级接口提取关键帧...")
|
||||
|
||||
# 检查是否有错误
|
||||
if frames and frames[0][0] == "ERROR":
|
||||
logger.error(f"线程池帧提取失败: {frames[0][1]}")
|
||||
# 降级到单线程模式
|
||||
logger.info("🔄 降级到单线程模式...")
|
||||
return await self._extract_frames_fallback(video_path)
|
||||
# 创建 Rust 视频处理器,使用配置参数
|
||||
extractor = rust_video.VideoKeyframeExtractor(
|
||||
ffmpeg_path=self.ffmpeg_path,
|
||||
threads=self.rust_threads,
|
||||
verbose=False # 使用固定值,不需要配置
|
||||
)
|
||||
|
||||
logger.info(f"✅ 成功提取{len(frames)}帧 (线程池模式)")
|
||||
# 1. 提取所有帧
|
||||
frames_data, width, height = extractor.extract_frames(
|
||||
video_path=video_path,
|
||||
max_frames=self.max_frames * 3 # 提取更多帧用于关键帧检测
|
||||
)
|
||||
|
||||
logger.info(f"提取到 {len(frames_data)} 帧,视频尺寸: {width}x{height}")
|
||||
|
||||
# 2. 检测关键帧,使用配置参数
|
||||
keyframe_indices = extractor.extract_keyframes(
|
||||
frames=frames_data,
|
||||
threshold=self.rust_keyframe_threshold,
|
||||
use_simd=self.rust_use_simd,
|
||||
block_size=self.rust_block_size
|
||||
)
|
||||
|
||||
logger.info(f"检测到 {len(keyframe_indices)} 个关键帧")
|
||||
|
||||
# 3. 转换选定的关键帧为 base64
|
||||
frames = []
|
||||
frame_count = 0
|
||||
|
||||
for idx in keyframe_indices[:self.max_frames]:
|
||||
if idx < len(frames_data):
|
||||
try:
|
||||
frame = frames_data[idx]
|
||||
frame_data = frame.get_data()
|
||||
|
||||
# 将灰度数据转换为PIL图像
|
||||
frame_array = np.frombuffer(frame_data, dtype=np.uint8).reshape((frame.height, frame.width))
|
||||
pil_image = Image.fromarray(
|
||||
frame_array,
|
||||
mode='L' # 灰度模式
|
||||
)
|
||||
|
||||
# 转换为RGB模式以便保存为JPEG
|
||||
pil_image = pil_image.convert('RGB')
|
||||
|
||||
# 调整图像大小
|
||||
if max(pil_image.size) > self.max_image_size:
|
||||
ratio = self.max_image_size / max(pil_image.size)
|
||||
new_size = tuple(int(dim * ratio) for dim in pil_image.size)
|
||||
pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS)
|
||||
|
||||
# 转换为 base64
|
||||
buffer = io.BytesIO()
|
||||
pil_image.save(buffer, format='JPEG', quality=self.frame_quality)
|
||||
frame_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
|
||||
|
||||
# 估算时间戳
|
||||
estimated_timestamp = frame.frame_number * (1.0 / 30.0) # 假设30fps
|
||||
|
||||
frames.append((frame_base64, estimated_timestamp))
|
||||
frame_count += 1
|
||||
|
||||
logger.debug(f"处理关键帧 {frame_count}: 帧号 {frame.frame_number}, 时间 {estimated_timestamp:.2f}s")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"处理关键帧 {idx} 失败: {e}")
|
||||
continue
|
||||
|
||||
logger.info(f"✅ Rust 高级提取完成: {len(frames)} 关键帧")
|
||||
return frames
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"线程池帧提取失败: {e}")
|
||||
# 降级到原始方法
|
||||
logger.info("🔄 降级到单线程模式...")
|
||||
return await self._extract_frames_fallback(video_path)
|
||||
logger.error(f"❌ Rust 高级帧提取失败: {e}")
|
||||
# 回退到基础方法
|
||||
logger.info("回退到基础 Rust 方法")
|
||||
return await self._extract_frames_rust(video_path)
|
||||
|
||||
async def _extract_frames_fallback(self, video_path: str) -> List[Tuple[str, float]]:
|
||||
"""帧提取的降级方法 - 原始异步版本"""
|
||||
frames = []
|
||||
extracted_count = 0
|
||||
cap = cv2.VideoCapture(video_path)
|
||||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
duration = total_frames / fps if fps > 0 else 0
|
||||
|
||||
logger.info(f"视频信息: {total_frames}帧, {fps:.2f}FPS, {duration:.2f}秒")
|
||||
|
||||
|
||||
if self.frame_extraction_mode == "time_interval":
|
||||
# 新模式:按时间间隔抽帧
|
||||
time_interval = self.frame_interval_seconds
|
||||
next_frame_time = 0.0
|
||||
async def _extract_frames_rust(self, video_path: str) -> List[Tuple[str, float]]:
|
||||
"""使用 Rust 实现的帧提取"""
|
||||
try:
|
||||
logger.info("🔄 使用 Rust 模块提取关键帧...")
|
||||
|
||||
while cap.isOpened():
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
# 创建临时输出目录
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
# 使用便捷函数进行关键帧提取,使用配置参数
|
||||
result = rust_video.extract_keyframes_from_video(
|
||||
video_path=video_path,
|
||||
output_dir=temp_dir,
|
||||
threshold=self.rust_keyframe_threshold,
|
||||
max_frames=self.max_frames * 2, # 提取更多帧以便筛选
|
||||
max_save=self.max_frames,
|
||||
ffmpeg_path=self.ffmpeg_path,
|
||||
use_simd=self.rust_use_simd,
|
||||
threads=self.rust_threads,
|
||||
verbose=False # 使用固定值,不需要配置
|
||||
)
|
||||
|
||||
current_time = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000.0
|
||||
logger.info(f"Rust 处理完成: 总帧数 {result.total_frames}, 关键帧 {result.keyframes_extracted}, 处理速度 {result.processing_fps:.1f} FPS")
|
||||
|
||||
if current_time >= next_frame_time:
|
||||
# 转换为PIL图像并压缩
|
||||
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
pil_image = Image.fromarray(frame_rgb)
|
||||
|
||||
# 调整图像大小
|
||||
if max(pil_image.size) > self.max_image_size:
|
||||
ratio = self.max_image_size / max(pil_image.size)
|
||||
new_size = tuple(int(dim * ratio) for dim in pil_image.size)
|
||||
pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS)
|
||||
|
||||
# 转换为base64
|
||||
buffer = io.BytesIO()
|
||||
pil_image.save(buffer, format='JPEG', quality=self.frame_quality)
|
||||
frame_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
|
||||
|
||||
frames.append((frame_base64, current_time))
|
||||
extracted_count += 1
|
||||
|
||||
logger.debug(f"提取第{extracted_count}帧 (时间: {current_time:.2f}s)")
|
||||
|
||||
next_frame_time += time_interval
|
||||
else:
|
||||
# 使用numpy优化帧间隔计算
|
||||
if duration > 0:
|
||||
frame_interval = max(1, int(duration / self.max_frames * fps))
|
||||
else:
|
||||
frame_interval = 30 # 默认间隔
|
||||
# 转换保存的关键帧为 base64 格式
|
||||
frames = []
|
||||
temp_dir_path = Path(temp_dir)
|
||||
|
||||
logger.info(f"计算得出帧间隔: {frame_interval} (将提取约{min(self.max_frames, total_frames // frame_interval + 1)}帧)")
|
||||
|
||||
# 使用numpy计算目标帧位置
|
||||
target_frames = np.arange(0, min(self.max_frames, total_frames // frame_interval + 1)) * frame_interval
|
||||
target_frames = target_frames[target_frames < total_frames].astype(int)
|
||||
|
||||
extracted_count = 0
|
||||
|
||||
for target_frame in target_frames:
|
||||
# 跳转到目标帧
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
continue
|
||||
|
||||
# 使用numpy优化图像处理
|
||||
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
# 获取所有保存的关键帧文件
|
||||
keyframe_files = sorted(temp_dir_path.glob("keyframe_*.jpg"))
|
||||
|
||||
# 转换为PIL图像并使用numpy进行尺寸计算
|
||||
height, width = frame_rgb.shape[:2]
|
||||
max_dim = max(height, width)
|
||||
for i, keyframe_file in enumerate(keyframe_files):
|
||||
if len(frames) >= self.max_frames:
|
||||
break
|
||||
|
||||
try:
|
||||
# 读取关键帧文件
|
||||
with open(keyframe_file, 'rb') as f:
|
||||
image_data = f.read()
|
||||
|
||||
# 转换为 PIL 图像并压缩
|
||||
pil_image = Image.open(io.BytesIO(image_data))
|
||||
|
||||
# 调整图像大小
|
||||
if max(pil_image.size) > self.max_image_size:
|
||||
ratio = self.max_image_size / max(pil_image.size)
|
||||
new_size = tuple(int(dim * ratio) for dim in pil_image.size)
|
||||
pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS)
|
||||
|
||||
# 转换为 base64
|
||||
buffer = io.BytesIO()
|
||||
pil_image.save(buffer, format='JPEG', quality=self.frame_quality)
|
||||
frame_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
|
||||
|
||||
# 估算时间戳(基于帧索引和总时长)
|
||||
if result.total_frames > 0:
|
||||
# 假设关键帧在时间上均匀分布
|
||||
estimated_timestamp = (i * result.total_time_ms / 1000.0) / result.keyframes_extracted
|
||||
else:
|
||||
estimated_timestamp = i * 1.0 # 默认每秒一帧
|
||||
|
||||
frames.append((frame_base64, estimated_timestamp))
|
||||
|
||||
logger.debug(f"处理关键帧 {i+1}: 估算时间 {estimated_timestamp:.2f}s")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"处理关键帧 {keyframe_file.name} 失败: {e}")
|
||||
continue
|
||||
|
||||
if max_dim > self.max_image_size:
|
||||
# 使用numpy计算缩放比例
|
||||
ratio = self.max_image_size / max_dim
|
||||
new_width = int(width * ratio)
|
||||
new_height = int(height * ratio)
|
||||
|
||||
# 使用opencv进行高效缩放
|
||||
frame_resized = cv2.resize(frame_rgb, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4)
|
||||
pil_image = Image.fromarray(frame_resized)
|
||||
else:
|
||||
pil_image = Image.fromarray(frame_rgb)
|
||||
logger.info(f"✅ Rust 提取完成: {len(frames)} 关键帧")
|
||||
return frames
|
||||
|
||||
# 转换为base64
|
||||
buffer = io.BytesIO()
|
||||
pil_image.save(buffer, format='JPEG', quality=self.frame_quality)
|
||||
frame_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
|
||||
|
||||
# 计算时间戳
|
||||
timestamp = target_frame / fps if fps > 0 else 0
|
||||
frames.append((frame_base64, timestamp))
|
||||
extracted_count += 1
|
||||
|
||||
logger.debug(f"提取第{extracted_count}帧 (时间: {timestamp:.2f}s, 帧号: {target_frame})")
|
||||
|
||||
# 每提取一帧让步一次
|
||||
await asyncio.sleep(0.001)
|
||||
|
||||
cap.release()
|
||||
logger.info(f"✅ 成功提取{len(frames)}帧")
|
||||
return frames
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Rust 帧提取失败: {e}")
|
||||
raise e
|
||||
|
||||
async def analyze_frames_batch(self, frames: List[Tuple[str, float]], user_question: str = None) -> str:
|
||||
"""批量分析所有帧"""
|
||||
@@ -493,29 +431,14 @@ class VideoAnalyzer:
|
||||
prompt += "\n\n请基于所有提供的帧图像进行综合分析,关注并描述视频的完整内容和故事发展。"
|
||||
|
||||
try:
|
||||
# 尝试使用多图片分析
|
||||
# 使用多图片分析
|
||||
response = await self._analyze_multiple_frames(frames, prompt)
|
||||
logger.info("✅ 视频识别完成")
|
||||
return response
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ 视频识别失败: {e}")
|
||||
# 降级到单帧分析
|
||||
logger.warning("降级到单帧分析模式")
|
||||
try:
|
||||
frame_base64, timestamp = frames[0]
|
||||
fallback_prompt = prompt + f"\n\n注意:由于技术限制,当前仅显示第1帧 (时间: {timestamp:.2f}s),视频共有{len(frames)}帧。请基于这一帧进行分析。"
|
||||
|
||||
response, _ = await self.video_llm.generate_response_for_image(
|
||||
prompt=fallback_prompt,
|
||||
image_base64=frame_base64,
|
||||
image_format="jpeg"
|
||||
)
|
||||
logger.info("✅ 降级的单帧分析完成")
|
||||
return response
|
||||
except Exception as fallback_e:
|
||||
logger.error(f"❌ 降级分析也失败: {fallback_e}")
|
||||
raise
|
||||
raise e
|
||||
|
||||
async def _analyze_multiple_frames(self, frames: List[Tuple[str, float]], prompt: str) -> str:
|
||||
"""使用多图片分析方法"""
|
||||
@@ -616,15 +539,20 @@ class VideoAnalyzer:
|
||||
# 如果汇总失败,返回各帧分析结果
|
||||
return f"视频逐帧分析结果:\n\n{chr(10).join(frame_analyses)}"
|
||||
|
||||
async def analyze_video(self, video_path: str, user_question: str = None) -> str:
|
||||
"""分析视频的主要方法"""
|
||||
async def analyze_video(self, video_path: str, user_question: str = None) -> Tuple[bool, str]:
|
||||
"""分析视频的主要方法
|
||||
|
||||
Returns:
|
||||
Tuple[bool, str]: (是否成功, 分析结果或错误信息)
|
||||
"""
|
||||
try:
|
||||
logger.info(f"开始分析视频: {os.path.basename(video_path)}")
|
||||
|
||||
# 提取帧
|
||||
frames = await self.extract_frames(video_path)
|
||||
if not frames:
|
||||
return "❌ 无法从视频中提取有效帧"
|
||||
error_msg = "❌ 无法从视频中提取有效帧"
|
||||
return (False, error_msg)
|
||||
|
||||
# 根据模式选择分析方法
|
||||
if self.analysis_mode == "auto":
|
||||
@@ -641,12 +569,12 @@ class VideoAnalyzer:
|
||||
result = await self.analyze_frames_sequential(frames, user_question)
|
||||
|
||||
logger.info("✅ 视频分析完成")
|
||||
return result
|
||||
return (True, result)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"❌ 视频分析失败: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
return error_msg
|
||||
return (False, error_msg)
|
||||
|
||||
async def analyze_video_from_bytes(self, video_bytes: bytes, filename: str = None, user_question: str = None, prompt: str = None) -> Dict[str, str]:
|
||||
"""从字节数据分析视频
|
||||
@@ -714,70 +642,60 @@ class VideoAnalyzer:
|
||||
logger.info(f"✅ 获得锁后发现已有结果,直接返回 (id: {existing_video.id})")
|
||||
video_event.set() # 通知其他等待者
|
||||
return {"summary": existing_video.description}
|
||||
|
||||
# 未找到已存在记录,开始新的分析
|
||||
logger.info("未找到已存在的视频记录,开始新的分析")
|
||||
|
||||
# 创建临时文件进行分析
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_file:
|
||||
temp_file.write(video_bytes)
|
||||
temp_path = temp_file.name
|
||||
|
||||
try:
|
||||
# 检查临时文件是否创建成功
|
||||
if not os.path.exists(temp_path):
|
||||
video_event.set() # 通知等待者
|
||||
return {"summary": "❌ 临时文件创建失败"}
|
||||
|
||||
# 使用临时文件进行分析
|
||||
result = await self.analyze_video(temp_path, question)
|
||||
# 未找到已存在记录,开始新的分析
|
||||
logger.info("未找到已存在的视频记录,开始新的分析")
|
||||
|
||||
finally:
|
||||
# 清理临时文件
|
||||
if os.path.exists(temp_path):
|
||||
os.unlink(temp_path)
|
||||
|
||||
# 保存分析结果到数据库
|
||||
metadata = {
|
||||
"filename": filename,
|
||||
"file_size": len(video_bytes),
|
||||
"analysis_timestamp": time.time()
|
||||
}
|
||||
self._store_video_result(
|
||||
video_hash=video_hash,
|
||||
description=result,
|
||||
metadata=metadata
|
||||
)
|
||||
|
||||
# 处理完成,通知等待者并清理资源
|
||||
video_event.set()
|
||||
async with video_lock_manager:
|
||||
# 清理资源
|
||||
video_locks.pop(video_hash, None)
|
||||
video_events.pop(video_hash, None)
|
||||
|
||||
return {"summary": result}
|
||||
# 创建临时文件进行分析
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_file:
|
||||
temp_file.write(video_bytes)
|
||||
temp_path = temp_file.name
|
||||
|
||||
try:
|
||||
# 检查临时文件是否创建成功
|
||||
if not os.path.exists(temp_path):
|
||||
video_event.set() # 通知等待者
|
||||
return {"summary": "❌ 临时文件创建失败"}
|
||||
|
||||
# 使用临时文件进行分析
|
||||
success, result = await self.analyze_video(temp_path, question)
|
||||
|
||||
finally:
|
||||
# 清理临时文件
|
||||
if os.path.exists(temp_path):
|
||||
os.unlink(temp_path)
|
||||
|
||||
# 保存分析结果到数据库(仅保存成功的结果)
|
||||
if success:
|
||||
metadata = {
|
||||
"filename": filename,
|
||||
"file_size": len(video_bytes),
|
||||
"analysis_timestamp": time.time()
|
||||
}
|
||||
self._store_video_result(
|
||||
video_hash=video_hash,
|
||||
description=result,
|
||||
metadata=metadata
|
||||
)
|
||||
logger.info("✅ 分析结果已保存到数据库")
|
||||
else:
|
||||
logger.warning("⚠️ 分析失败,不保存到数据库以便后续重试")
|
||||
|
||||
# 处理完成,通知等待者并清理资源
|
||||
video_event.set()
|
||||
async with video_lock_manager:
|
||||
# 清理资源
|
||||
video_locks.pop(video_hash, None)
|
||||
video_events.pop(video_hash, None)
|
||||
|
||||
return {"summary": result}
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"❌ 从字节数据分析视频失败: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
|
||||
# 即使失败也保存错误信息到数据库,避免重复处理
|
||||
try:
|
||||
metadata = {
|
||||
"filename": filename,
|
||||
"file_size": len(video_bytes),
|
||||
"analysis_timestamp": time.time(),
|
||||
"error": str(e)
|
||||
}
|
||||
self._store_video_result(
|
||||
video_hash=video_hash,
|
||||
description=error_msg,
|
||||
metadata=metadata
|
||||
)
|
||||
logger.info("✅ 错误信息已保存到数据库")
|
||||
except Exception as store_e:
|
||||
logger.error(f"❌ 保存错误信息失败: {store_e}")
|
||||
# 不保存错误信息到数据库,允许后续重试
|
||||
logger.info("💡 错误信息不保存到数据库,允许后续重试")
|
||||
|
||||
# 处理失败,通知等待者并清理资源
|
||||
try:
|
||||
@@ -797,6 +715,54 @@ class VideoAnalyzer:
|
||||
supported_formats = {'.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.m4v', '.3gp', '.webm'}
|
||||
return Path(file_path).suffix.lower() in supported_formats
|
||||
|
||||
def get_processing_capabilities(self) -> Dict[str, any]:
|
||||
"""获取处理能力信息"""
|
||||
try:
|
||||
system_info = rust_video.get_system_info()
|
||||
|
||||
# 创建一个临时的extractor来获取CPU特性
|
||||
extractor = rust_video.VideoKeyframeExtractor(threads=0, verbose=False)
|
||||
cpu_features = extractor.get_cpu_features()
|
||||
|
||||
capabilities = {
|
||||
"system": {
|
||||
"threads": system_info.get('threads', 0),
|
||||
"rust_version": system_info.get('version', 'unknown'),
|
||||
},
|
||||
"cpu_features": cpu_features,
|
||||
"recommended_settings": self._get_recommended_settings(cpu_features),
|
||||
"analysis_modes": ["auto", "batch", "sequential"],
|
||||
"supported_formats": ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.m4v', '.3gp', '.webm']
|
||||
}
|
||||
|
||||
return capabilities
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取处理能力信息失败: {e}")
|
||||
return {"error": str(e)}
|
||||
|
||||
def _get_recommended_settings(self, cpu_features: Dict[str, bool]) -> Dict[str, any]:
|
||||
"""根据CPU特性推荐最佳设置"""
|
||||
settings = {
|
||||
"use_simd": any(cpu_features.values()),
|
||||
"block_size": 8192,
|
||||
"threads": 0 # 自动检测
|
||||
}
|
||||
|
||||
# 根据CPU特性调整设置
|
||||
if cpu_features.get('avx2', False):
|
||||
settings["block_size"] = 16384 # AVX2支持更大的块
|
||||
settings["optimization_level"] = "avx2"
|
||||
elif cpu_features.get('sse2', False):
|
||||
settings["block_size"] = 8192
|
||||
settings["optimization_level"] = "sse2"
|
||||
else:
|
||||
settings["use_simd"] = False
|
||||
settings["block_size"] = 4096
|
||||
settings["optimization_level"] = "scalar"
|
||||
|
||||
return settings
|
||||
|
||||
|
||||
# 全局实例
|
||||
_video_analyzer = None
|
||||
|
||||
Reference in New Issue
Block a user