优化项目结构,格式化代码

This commit is contained in:
雅诺狐
2025-08-13 09:49:39 +08:00
parent 26039b003c
commit fe97d897ef
24 changed files with 57 additions and 77 deletions

View File

@@ -0,0 +1,352 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视频分析器模块 - 优化版本
支持多种分析模式:批处理、逐帧、自动选择
"""
import os
import cv2
import tempfile
import asyncio
import base64
from PIL import Image
from pathlib import Path
from typing import List, Tuple, Dict
import io
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config, model_config
from src.common.logger import get_logger
logger = get_logger("src.multimodal.video_analyzer")
class VideoAnalyzer:
"""优化的视频分析器类"""
def __init__(self):
"""初始化视频分析器"""
# 使用专用的视频分析配置
try:
self.video_llm = LLMRequest(
model_set=model_config.model_task_config.utils_video,
request_type="utils_video"
)
except (AttributeError, KeyError) as e:
# 如果utils_video不存在使用vlm配置
self.video_llm = LLMRequest(
model_set=model_config.model_task_config.vlm,
request_type="vlm"
)
logger.warning(f"utils_video配置不可用({e})回退使用vlm配置")
# 从配置文件读取参数,如果配置不存在则使用默认值
try:
config = global_config.utils_video
self.max_frames = config.max_frames
self.frame_quality = config.frame_quality
self.max_image_size = config.max_image_size
self.enable_frame_timing = config.enable_frame_timing
self.batch_analysis_prompt = config.batch_analysis_prompt
# 将配置文件中的模式映射到内部使用的模式名称
config_mode = config.analysis_mode
if config_mode == "batch_frames":
self.analysis_mode = "batch"
elif config_mode == "frame_by_frame":
self.analysis_mode = "sequential"
elif config_mode == "auto":
self.analysis_mode = "auto"
else:
logger.warning(f"无效的分析模式: {config_mode}使用默认的auto模式")
self.analysis_mode = "auto"
self.frame_analysis_delay = 0.3 # API调用间隔
self.frame_interval = 1.0 # 抽帧时间间隔(秒)
self.batch_size = 3 # 批处理时每批处理的帧数
self.timeout = 60.0 # 分析超时时间(秒)
logger.info("✅ 从配置文件读取视频分析参数")
except AttributeError as e:
# 如果配置不存在,使用代码中的默认值
logger.warning(f"配置文件中缺少utils_video配置({e}),使用默认值")
self.max_frames = 6
self.frame_quality = 85
self.max_image_size = 600
self.analysis_mode = "auto"
self.frame_analysis_delay = 0.3
self.frame_interval = 1.0 # 抽帧时间间隔(秒)
self.batch_size = 3 # 批处理时每批处理的帧数
self.timeout = 60.0 # 分析超时时间(秒)
self.enable_frame_timing = True
self.batch_analysis_prompt = """请分析这个视频的内容。这些图片是从视频中按时间顺序提取的关键帧。
请提供详细的分析,包括:
1. 视频的整体内容和主题
2. 主要人物、对象和场景描述
3. 动作、情节和时间线发展
4. 视觉风格和艺术特点
5. 整体氛围和情感表达
6. 任何特殊的视觉效果或文字内容
请用中文回答,分析要详细准确。"""
# 系统提示词
self.system_prompt = "你是一个专业的视频内容分析助手。请仔细观察用户提供的视频关键帧,详细描述视频内容。"
logger.info(f"✅ 视频分析器初始化完成,分析模式: {self.analysis_mode}")
def set_analysis_mode(self, mode: str):
"""设置分析模式"""
if mode in ["batch", "sequential", "auto"]:
self.analysis_mode = mode
# logger.info(f"分析模式已设置为: {mode}")
else:
logger.warning(f"无效的分析模式: {mode}")
async def extract_frames(self, video_path: str) -> List[Tuple[str, float]]:
"""提取视频帧"""
frames = []
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps if fps > 0 else 0
logger.info(f"视频信息: {total_frames}帧, {fps:.2f}FPS, {duration:.2f}")
# 动态计算帧间隔
if duration > 0:
frame_interval = max(1, int(duration / self.max_frames * fps))
else:
frame_interval = 30 # 默认间隔
frame_count = 0
extracted_count = 0
while cap.isOpened() and extracted_count < self.max_frames:
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
# 转换为PIL图像并压缩
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(frame_rgb)
# 调整图像大小
if max(pil_image.size) > self.max_image_size:
ratio = self.max_image_size / max(pil_image.size)
new_size = tuple(int(dim * ratio) for dim in pil_image.size)
pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS)
# 转换为base64
buffer = io.BytesIO()
pil_image.save(buffer, format='JPEG', quality=self.frame_quality)
frame_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
# 计算时间戳
timestamp = frame_count / fps if fps > 0 else 0
frames.append((frame_base64, timestamp))
extracted_count += 1
logger.debug(f"📸 提取第{extracted_count}帧 (时间: {timestamp:.2f}s)")
frame_count += 1
cap.release()
logger.info(f"✅ 成功提取{len(frames)}")
return frames
async def analyze_frames_batch(self, frames: List[Tuple[str, float]], user_question: str = None) -> str:
"""批量分析所有帧"""
logger.info(f"开始批量分析{len(frames)}")
# 构建提示词
prompt = self.batch_analysis_prompt
if user_question:
prompt += f"\n\n用户问题: {user_question}"
# 添加帧信息到提示词
for i, (frame_base64, timestamp) in enumerate(frames):
if self.enable_frame_timing:
prompt += f"\n\n{i+1}帧 (时间: {timestamp:.2f}s):"
try:
# 使用第一帧进行分析(批量模式暂时使用单帧,后续可以优化为真正的多图片分析)
if frames:
frame_base64, _ = frames[0]
prompt += f"\n\n注意当前显示的是第1帧请基于这一帧和提示词进行分析。视频共有{len(frames)}帧。"
response, _ = await self.video_llm.generate_response_for_image(
prompt=prompt,
image_base64=frame_base64,
image_format="jpeg"
)
logger.info("✅ 批量分析完成")
return response
else:
return "❌ 没有可分析的帧"
except Exception as e:
logger.error(f"❌ 批量分析失败: {e}")
raise
async def analyze_frames_sequential(self, frames: List[Tuple[str, float]], user_question: str = None) -> str:
"""逐帧分析并汇总"""
logger.info(f"开始逐帧分析{len(frames)}")
frame_analyses = []
for i, (frame_base64, timestamp) in enumerate(frames):
try:
prompt = f"请分析这个视频的第{i+1}"
if self.enable_frame_timing:
prompt += f" (时间: {timestamp:.2f}s)"
prompt += "。描述你看到的内容,包括人物、动作、场景、文字等。"
if user_question:
prompt += f"\n特别关注: {user_question}"
response, _ = await self.video_llm.generate_response_for_image(
prompt=prompt,
image_base64=frame_base64,
image_format="jpeg"
)
frame_analyses.append(f"{i+1}帧 ({timestamp:.2f}s): {response}")
logger.debug(f"✅ 第{i+1}帧分析完成")
# API调用间隔
if i < len(frames) - 1:
await asyncio.sleep(self.frame_analysis_delay)
except Exception as e:
logger.error(f"❌ 第{i+1}帧分析失败: {e}")
frame_analyses.append(f"{i+1}帧: 分析失败 - {e}")
# 生成汇总
logger.info("开始生成汇总分析")
summary_prompt = f"""基于以下各帧的分析结果,请提供一个完整的视频内容总结:
{chr(10).join(frame_analyses)}
请综合所有帧的信息,描述视频的整体内容、故事线、主要元素和特点。"""
if user_question:
summary_prompt += f"\n特别回答用户的问题: {user_question}"
try:
# 使用最后一帧进行汇总分析
if frames:
last_frame_base64, _ = frames[-1]
summary, _ = await self.video_llm.generate_response_for_image(
prompt=summary_prompt,
image_base64=last_frame_base64,
image_format="jpeg"
)
logger.info("✅ 逐帧分析和汇总完成")
return summary
else:
return "❌ 没有可用于汇总的帧"
except Exception as e:
logger.error(f"❌ 汇总分析失败: {e}")
# 如果汇总失败,返回各帧分析结果
return f"视频逐帧分析结果:\n\n{chr(10).join(frame_analyses)}"
async def analyze_video(self, video_path: str, user_question: str = None) -> str:
"""分析视频的主要方法"""
try:
logger.info(f"开始分析视频: {os.path.basename(video_path)}")
# 提取帧
frames = await self.extract_frames(video_path)
if not frames:
return "❌ 无法从视频中提取有效帧"
# 根据模式选择分析方法
if self.analysis_mode == "auto":
# 智能选择少于等于3帧用批量否则用逐帧
mode = "batch" if len(frames) <= 3 else "sequential"
logger.info(f"自动选择分析模式: {mode} (基于{len(frames)}帧)")
else:
mode = self.analysis_mode
# 执行分析
if mode == "batch":
result = await self.analyze_frames_batch(frames, user_question)
else: # sequential
result = await self.analyze_frames_sequential(frames, user_question)
logger.info("✅ 视频分析完成")
return result
except Exception as e:
error_msg = f"❌ 视频分析失败: {str(e)}"
logger.error(error_msg)
return error_msg
async def analyze_video_from_bytes(self, video_bytes: bytes, filename: str = None, user_question: str = None, prompt: str = None) -> Dict[str, str]:
"""从字节数据分析视频
Args:
video_bytes: 视频字节数据
filename: 文件名(可选)
user_question: 用户问题(旧参数名,保持兼容性)
prompt: 提示词(新参数名,与系统调用保持一致)
Returns:
Dict[str, str]: 包含分析结果的字典,格式为 {"summary": "分析结果"}
"""
try:
logger.info("开始从字节数据分析视频")
# 兼容性处理如果传入了prompt参数使用prompt否则使用user_question
question = prompt if prompt is not None else user_question
# 检查视频数据是否有效
if not video_bytes:
return {"summary": "❌ 视频数据为空"}
# 创建临时文件保存视频数据
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_file:
temp_file.write(video_bytes)
temp_path = temp_file.name
try:
# 检查临时文件是否创建成功
if not os.path.exists(temp_path):
return {"summary": "❌ 临时文件创建失败"}
# 使用临时文件进行分析
result = await self.analyze_video(temp_path, question)
return {"summary": result}
finally:
# 清理临时文件
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
logger.debug("临时文件已清理")
except Exception as e:
logger.warning(f"清理临时文件失败: {e}")
except Exception as e:
error_msg = f"❌ 从字节数据分析视频失败: {str(e)}"
logger.error(error_msg)
return {"summary": error_msg}
def is_supported_video(self, file_path: str) -> bool:
"""检查是否为支持的视频格式"""
supported_formats = {'.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.m4v', '.3gp', '.webm'}
return Path(file_path).suffix.lower() in supported_formats
# 全局实例
_video_analyzer = None
def get_video() -> VideoAnalyzer:
"""获取视频分析器实例"""
global _video_analyzer
if _video_analyzer is None:
_video_analyzer = VideoAnalyzer()
return _video_analyzer