fix: 修复视频分析并发处理和数据库存储问题

🔧 修复视频分析模块的关键并发和存储问题

**主要修复:**
1. **并发竞争条件修复**
   - 解决相同视频hash同时处理导致重复分析的问题
   - 重构并发控制机制,使用per-video独立锁和事件
   - 消除30秒超时后多个请求同时执行的竞争条件

2. **数据库存储优化**
   - 移除Videos表path字段的复杂唯一性检查逻辑
   - 简化为基于hash的纯唯一标识存储
   - 修复path字段重复导致的IntegrityError

3. **代码简化和清理**
   - 删除重编码视频检测功能(复杂且用处有限)
   - 移除不必要的特征匹配和计数更新逻辑
   - 简化存储路径生成,统一使用hash前缀

Fixes: 视频并发处理竞争条件、数据库存储冲突、聊天循环变量错误
This commit is contained in:
雅诺狐
2025-08-22 23:55:54 +08:00
parent 4ee894913e
commit 1f07104181
10 changed files with 279 additions and 623 deletions

1
bot.py
View File

@@ -1,4 +1,5 @@
#import asyncio
import asyncio
import hashlib
import os
import sys

View File

@@ -26,6 +26,7 @@ dependencies = [
"json5>=0.12.1",
"jsonlines>=4.0.0",
"langfuse==2.46.2",
"lunar-python>=1.4.4",
"lxml>=6.0.0",
"maim-message>=0.3.8",
"matplotlib>=3.10.3",

View File

@@ -174,6 +174,9 @@ class CycleProcessor:
- 在FOCUS模式下同步生成确保及时响应
- 发送生成的回复并结束循环
"""
# 初始化reply_to_str以避免UnboundLocalError
reply_to_str = None
if self.context.loop_mode == ChatMode.NORMAL:
if not gen_task:
reply_to_str = await self._build_reply_to_str(message_data)
@@ -185,6 +188,11 @@ class CycleProcessor:
request_type="chat.replyer.normal",
)
)
else:
# 如果gen_task已存在但reply_to_str还未构建需要构建它
if reply_to_str is None:
reply_to_str = await self._build_reply_to_str(message_data)
try:
response_set = await asyncio.wait_for(gen_task, timeout=global_config.chat.thinking_timeout)
except asyncio.TimeoutError:

View File

@@ -11,7 +11,7 @@ from maim_message import Seg, UserInfo, BaseMessageInfo, MessageBase
from src.common.logger import get_logger
from src.chat.utils.utils_image import get_image_manager
from src.chat.utils.utils_voice import get_voice_text
from src.chat.utils.utils_video import get_video
from src.chat.utils.utils_video import get_video_analyzer
from src.config.config import global_config
from .chat_stream import ChatStream
@@ -196,7 +196,6 @@ class MessageRecv(Message):
self.is_emoji = False
self.is_voice = False
logger.info(f"接收到视频消息,数据类型: {type(segment.data)}")
logger.debug(f"视频数据内容: {segment.data}")
if global_config.video_analysis.enable:
logger.info("已启用视频识别,开始识别")
if isinstance(segment.data, dict):
@@ -214,7 +213,7 @@ class MessageRecv(Message):
logger.info(f"解码后视频大小: {len(video_bytes)} 字节")
# 使用video analyzer分析视频
video_analyzer = get_video()
video_analyzer = get_video_analyzer()
result = await video_analyzer.analyze_video_from_bytes(
video_bytes,
filename,
@@ -372,7 +371,6 @@ class MessageRecvS4U(MessageRecv):
self.is_emoji = False
logger.info(f"接收到视频消息,数据类型: {type(segment.data)}")
logger.debug(f"视频数据内容: {segment.data}")
if global_config.video_analysis.enable:
logger.info("已启用视频识别,开始识别")
if isinstance(segment.data, dict):
@@ -390,7 +388,7 @@ class MessageRecvS4U(MessageRecv):
logger.info(f"解码后视频大小: {len(video_bytes)} 字节")
# 使用video analyzer分析视频
video_analyzer = get_video()
video_analyzer = get_video_analyzer()
result = await video_analyzer.analyze_video_from_bytes(
video_bytes,
filename,

View File

@@ -10,16 +10,27 @@ import cv2
import tempfile
import asyncio
import base64
import hashlib
import time
from PIL import Image
from pathlib import Path
from typing import List, Tuple, Dict
from typing import List, Tuple, Optional, Dict
import io
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config, model_config
from src.common.logger import get_logger
from src.common.database.sqlalchemy_models import get_db_session, Videos
logger = get_logger("src.multimodal.video_analyzer")
logger = get_logger("utils_video")
# 全局正在处理的视频哈希集合,用于防止重复处理
processing_videos = set()
processing_lock = asyncio.Lock()
# 为每个视频hash创建独立的锁和事件
video_locks = {}
video_events = {}
video_lock_manager = asyncio.Lock()
class VideoAnalyzer:
@@ -30,16 +41,17 @@ class VideoAnalyzer:
# 使用专用的视频分析配置
try:
self.video_llm = LLMRequest(
model_set=model_config.model_task_config.utils_video,
request_type="utils_video"
model_set=model_config.model_task_config.video_analysis,
request_type="video_analysis"
)
logger.info("✅ 使用video_analysis模型配置")
except (AttributeError, KeyError) as e:
# 如果utils_video不存在使用vlm配置
# 如果video_analysis不存在使用vlm配置
self.video_llm = LLMRequest(
model_set=model_config.model_task_config.vlm,
request_type="vlm"
)
logger.warning(f"utils_video配置不可用({e})回退使用vlm配置")
logger.warning(f"video_analysis配置不可用({e})回退使用vlm配置")
# 从配置文件读取参数,如果配置不存在则使用默认值
try:
@@ -70,7 +82,7 @@ class VideoAnalyzer:
except AttributeError as e:
# 如果配置不存在,使用代码中的默认值
logger.warning(f"配置文件中缺少utils_video配置({e}),使用默认值")
logger.warning(f"配置文件中缺少video_analysis配置({e}),使用默认值")
self.max_frames = 6
self.frame_quality = 85
self.max_image_size = 600
@@ -97,6 +109,70 @@ class VideoAnalyzer:
logger.info(f"✅ 视频分析器初始化完成,分析模式: {self.analysis_mode}")
def _calculate_video_hash(self, video_data: bytes) -> str:
"""计算视频文件的hash值"""
hash_obj = hashlib.sha256()
hash_obj.update(video_data)
return hash_obj.hexdigest()
def _check_video_exists(self, video_hash: str) -> Optional[Videos]:
"""检查视频是否已经分析过"""
try:
with get_db_session() as session:
# 明确刷新会话以确保看到其他事务的最新提交
session.expire_all()
return session.query(Videos).filter(Videos.video_hash == video_hash).first()
except Exception as e:
logger.warning(f"检查视频是否存在时出错: {e}")
return None
def _store_video_result(self, video_hash: str, description: str, metadata: Optional[Dict] = None) -> Optional[Videos]:
"""存储视频分析结果到数据库"""
try:
with get_db_session() as session:
# 只根据video_hash查找
existing_video = session.query(Videos).filter(
Videos.video_hash == video_hash
).first()
if existing_video:
# 如果已存在,更新描述和计数
existing_video.description = description
existing_video.count += 1
existing_video.timestamp = time.time()
if metadata:
existing_video.duration = metadata.get('duration')
existing_video.frame_count = metadata.get('frame_count')
existing_video.fps = metadata.get('fps')
existing_video.resolution = metadata.get('resolution')
existing_video.file_size = metadata.get('file_size')
session.commit()
session.refresh(existing_video)
logger.info(f"✅ 更新已存在的视频记录hash: {video_hash[:16]}..., count: {existing_video.count}")
return existing_video
else:
video_record = Videos(
video_hash=video_hash,
description=description,
timestamp=time.time(),
count=1
)
if metadata:
video_record.duration = metadata.get('duration')
video_record.frame_count = metadata.get('frame_count')
video_record.fps = metadata.get('fps')
video_record.resolution = metadata.get('resolution')
video_record.file_size = metadata.get('file_size')
session.add(video_record)
session.commit()
session.refresh(video_record)
logger.info(f"✅ 新视频分析结果已保存到数据库hash: {video_hash[:16]}...")
return video_record
except Exception as e:
logger.error(f"❌ 存储视频分析结果时出错: {e}")
return None
def set_analysis_mode(self, mode: str):
"""设置分析模式"""
if mode in ["batch", "sequential", "auto"]:
@@ -150,7 +226,7 @@ class VideoAnalyzer:
frames.append((frame_base64, timestamp))
extracted_count += 1
logger.debug(f"📸 提取第{extracted_count}帧 (时间: {timestamp:.2f}s)")
logger.debug(f"提取第{extracted_count}帧 (时间: {timestamp:.2f}s)")
frame_count += 1
@@ -162,6 +238,9 @@ class VideoAnalyzer:
"""批量分析所有帧"""
logger.info(f"开始批量分析{len(frames)}")
if not frames:
return "❌ 没有可分析的帧"
# 构建提示词
prompt = self.batch_analysis_prompt
@@ -169,28 +248,77 @@ class VideoAnalyzer:
prompt += f"\n\n用户问题: {user_question}"
# 添加帧信息到提示词
frame_info = []
for i, (_frame_base64, timestamp) in enumerate(frames):
if self.enable_frame_timing:
prompt += f"\n\n{i+1}帧 (时间: {timestamp:.2f}s):"
frame_info.append(f"{i+1}帧 (时间: {timestamp:.2f}s)")
else:
frame_info.append(f"{i+1}")
prompt += f"\n\n视频包含{len(frames)}帧图像:{', '.join(frame_info)}"
prompt += "\n\n请基于所有提供的帧图像进行综合分析,关注并描述视频的完整内容和故事发展。"
try:
# 使用第一帧进行分析(批量模式暂时使用单帧,后续可以优化为真正的多图片分析
if frames:
frame_base64, _ = frames[0]
prompt += f"\n\n注意当前显示的是第1帧请基于这一帧和提示词进行分析。视频共有{len(frames)}帧。"
# 尝试使用多图片分析
response = await self._analyze_multiple_frames(frames, prompt)
logger.info("✅ 视频识别完成")
return response
except Exception as e:
logger.error(f"❌ 视频识别失败: {e}")
# 降级到单帧分析
logger.warning("降级到单帧分析模式")
try:
frame_base64, timestamp = frames[0]
fallback_prompt = prompt + f"\n\n注意由于技术限制当前仅显示第1帧 (时间: {timestamp:.2f}s),视频共有{len(frames)}帧。请基于这一帧进行分析。"
response, _ = await self.video_llm.generate_response_for_image(
prompt=prompt,
prompt=fallback_prompt,
image_base64=frame_base64,
image_format="jpeg"
)
logger.info("批量分析完成")
logger.info("降级的单帧分析完成")
return response
else:
return "❌ 没有可分析的帧"
except Exception as e:
logger.error(f"❌ 批量分析失败: {e}")
raise
except Exception as fallback_e:
logger.error(f"❌ 降级分析也失败: {fallback_e}")
raise
async def _analyze_multiple_frames(self, frames: List[Tuple[str, float]], prompt: str) -> str:
"""使用多图片分析方法"""
logger.info(f"开始构建包含{len(frames)}帧的分析请求")
# 导入MessageBuilder用于构建多图片消息
from src.llm_models.payload_content.message import MessageBuilder, RoleType
from src.llm_models.utils_model import RequestType
# 构建包含多张图片的消息
message_builder = MessageBuilder().set_role(RoleType.User).add_text_content(prompt)
# 添加所有帧图像
for _i, (frame_base64, _timestamp) in enumerate(frames):
message_builder.add_image_content("jpeg", frame_base64)
# logger.info(f"已添加第{i+1}帧到分析请求 (时间: {timestamp:.2f}s, 图片大小: {len(frame_base64)} chars)")
message = message_builder.build()
# logger.info(f"✅ 多帧消息构建完成,包含{len(frames)}张图片")
# 获取模型信息和客户端
model_info, api_provider, client = self.video_llm._select_model()
# logger.info(f"使用模型: {model_info.name} 进行多帧分析")
# 直接执行多图片请求
api_response = await self.video_llm._execute_request(
api_provider=api_provider,
client=client,
request_type=RequestType.RESPONSE,
model_info=model_info,
message_list=[message],
temperature=None,
max_tokens=None
)
logger.info(f"视频识别完成,响应长度: {len(api_response.content or '')} ")
return api_response.content or "❌ 未获得响应内容"
async def analyze_frames_sequential(self, frames: List[Tuple[str, float]], user_question: str = None) -> str:
"""逐帧分析并汇总"""
@@ -291,13 +419,16 @@ class VideoAnalyzer:
Args:
video_bytes: 视频字节数据
filename: 文件名(可选)
filename: 文件名(可选,仅用于日志
user_question: 用户问题(旧参数名,保持兼容性)
prompt: 提示词(新参数名,与系统调用保持一致)
Returns:
Dict[str, str]: 包含分析结果的字典,格式为 {"summary": "分析结果"}
"""
video_hash = None
video_event = None
try:
logger.info("开始从字节数据分析视频")
@@ -308,7 +439,52 @@ class VideoAnalyzer:
if not video_bytes:
return {"summary": "❌ 视频数据为空"}
# 创建临时文件保存视频数据
# 计算视频hash值
video_hash = self._calculate_video_hash(video_bytes)
logger.info(f"视频hash: {video_hash}")
# 改进的并发控制:使用每个视频独立的锁和事件
async with video_lock_manager:
if video_hash not in video_locks:
video_locks[video_hash] = asyncio.Lock()
video_events[video_hash] = asyncio.Event()
video_lock = video_locks[video_hash]
video_event = video_events[video_hash]
# 尝试获取该视频的专用锁
if video_lock.locked():
logger.info(f"⏳ 相同视频正在处理中,等待处理完成... (hash: {video_hash[:16]}...)")
try:
# 等待处理完成的事件信号最多等待60秒
await asyncio.wait_for(video_event.wait(), timeout=60.0)
logger.info(f"✅ 等待结束,检查是否有处理结果")
# 检查是否有结果了
existing_video = self._check_video_exists(video_hash)
if existing_video:
logger.info(f"✅ 找到了处理结果,直接返回 (id: {existing_video.id})")
return {"summary": existing_video.description}
else:
logger.warning(f"⚠️ 等待完成但未找到结果,可能处理失败")
except asyncio.TimeoutError:
logger.warning(f"⚠️ 等待超时(60秒),放弃等待")
# 获取锁开始处理
async with video_lock:
logger.info(f"🔒 获得视频处理锁,开始处理 (hash: {video_hash[:16]}...)")
# 再次检查数据库(可能在等待期间已经有结果了)
existing_video = self._check_video_exists(video_hash)
if existing_video:
logger.info(f"✅ 获得锁后发现已有结果,直接返回 (id: {existing_video.id})")
video_event.set() # 通知其他等待者
return {"summary": existing_video.description}
# 未找到已存在记录,开始新的分析
logger.info("未找到已存在的视频记录,开始新的分析")
# 创建临时文件进行分析
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_file:
temp_file.write(video_bytes)
temp_path = temp_file.name
@@ -316,23 +492,70 @@ class VideoAnalyzer:
try:
# 检查临时文件是否创建成功
if not os.path.exists(temp_path):
video_event.set() # 通知等待者
return {"summary": "❌ 临时文件创建失败"}
# 使用临时文件进行分析
result = await self.analyze_video(temp_path, question)
return {"summary": result}
finally:
# 清理临时文件
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
logger.debug("临时文件已清理")
except Exception as e:
logger.warning(f"清理临时文件失败: {e}")
if os.path.exists(temp_path):
os.unlink(temp_path)
# 保存分析结果到数据库
metadata = {
"filename": filename,
"file_size": len(video_bytes),
"analysis_timestamp": time.time()
}
self._store_video_result(
video_hash=video_hash,
description=result,
metadata=metadata
)
# 处理完成,通知等待者并清理资源
video_event.set()
async with video_lock_manager:
# 清理资源
video_locks.pop(video_hash, None)
video_events.pop(video_hash, None)
return {"summary": result}
except Exception as e:
error_msg = f"❌ 从字节数据分析视频失败: {str(e)}"
logger.error(error_msg)
# 即使失败也保存错误信息到数据库,避免重复处理
try:
metadata = {
"filename": filename,
"file_size": len(video_bytes),
"analysis_timestamp": time.time(),
"error": str(e)
}
self._store_video_result(
video_hash=video_hash,
description=error_msg,
metadata=metadata
)
logger.info("✅ 错误信息已保存到数据库")
except Exception as store_e:
logger.error(f"❌ 保存错误信息失败: {store_e}")
# 处理失败,通知等待者并清理资源
try:
if video_hash and video_event:
async with video_lock_manager:
if video_hash in video_events:
video_events[video_hash].set()
video_locks.pop(video_hash, None)
video_events.pop(video_hash, None)
except Exception as cleanup_e:
logger.error(f"❌ 清理锁资源失败: {cleanup_e}")
return {"summary": error_msg}
def is_supported_video(self, file_path: str) -> bool:
@@ -344,8 +567,8 @@ class VideoAnalyzer:
# 全局实例
_video_analyzer = None
def get_video() -> VideoAnalyzer:
"""获取视频分析器实例"""
def get_video_analyzer() -> VideoAnalyzer:
"""获取视频分析器实例(单例模式)"""
global _video_analyzer
if _video_analyzer is None:
_video_analyzer = VideoAnalyzer()

View File

@@ -225,7 +225,6 @@ class Videos(Base):
video_id = Column(Text, nullable=False, default="")
video_hash = Column(get_string_field(64), nullable=False, index=True, unique=True)
description = Column(Text, nullable=True)
path = Column(get_string_field(500), nullable=False, unique=True)
count = Column(Integer, nullable=False, default=1)
timestamp = Column(Float, nullable=False)
vlm_processed = Column(Boolean, nullable=False, default=False)
@@ -239,7 +238,6 @@ class Videos(Base):
__table_args__ = (
Index('idx_videos_video_hash', 'video_hash'),
Index('idx_videos_path', 'path'),
Index('idx_videos_timestamp', 'timestamp'),
)

View File

@@ -439,7 +439,7 @@ MODULE_COLORS = {
# 聊天和多媒体扩展
"chat_voice": "\033[38;5;87m", # 浅青色
"typo_gen": "\033[38;5;123m", # 天蓝色
"src.multimodal.video_analyzer": "\033[38;5;75m", # 亮蓝色
"utils_video": "\033[38;5;75m", # 亮蓝色
"ReplyerManager": "\033[38;5;173m", # 浅橙色
"relationship_builder_manager": "\033[38;5;176m", # 浅紫色
"expression_selector": "\033[38;5;176m",
@@ -542,7 +542,7 @@ MODULE_ALIASES = {
# 聊天和多媒体扩展
"chat_voice": "语音处理",
"typo_gen": "错字生成",
"src.multimodal.video_analyzer": "视频分析",
"src.chat.utils.utils_video": "视频分析",
"ReplyerManager": "回复管理",
"relationship_builder_manager": "关系管理",

View File

@@ -1,10 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多模态处理模块
包含图像、视频、音频等多媒体内容的处理工具
"""
# 由于video_analyzer.py文件存在但可能有循环导入或其他问题
# 暂时不在__init__.py中导入让使用者直接导入
pass

View File

@@ -1,571 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视频分析器模块 - 优化版本
支持多种分析模式:批处理、逐帧、自动选择
"""
import os
import cv2
import tempfile
import asyncio
import base64
import hashlib
import time
from PIL import Image
from pathlib import Path
from typing import List, Tuple, Optional, Dict
import io
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config, model_config
from src.common.logger import get_logger
from src.common.database.sqlalchemy_models import get_db_session, Videos
logger = get_logger("src.multimodal.video_analyzer")
class VideoAnalyzer:
"""优化的视频分析器类"""
def __init__(self):
"""初始化视频分析器"""
# 首先初始化logger
self.logger = get_logger(__name__)
# 使用专用的视频分析配置
try:
self.video_llm = LLMRequest(
model_set=model_config.model_task_config.video_analysis,
request_type="video_analysis"
)
self.logger.info("✅ 使用video_analysis模型配置")
except (AttributeError, KeyError) as e:
# 如果video_analysis不存在使用vlm配置
self.video_llm = LLMRequest(
model_set=model_config.model_task_config.vlm,
request_type="vlm"
)
self.logger.warning(f"video_analysis配置不可用({e})回退使用vlm配置")
# 从配置文件读取参数,如果配置不存在则使用默认值
try:
config = global_config.video_analysis
self.max_frames = config.max_frames
self.frame_quality = config.frame_quality
self.max_image_size = config.max_image_size
self.enable_frame_timing = config.enable_frame_timing
self.batch_analysis_prompt = config.batch_analysis_prompt
# 将配置文件中的模式映射到内部使用的模式名称
config_mode = config.analysis_mode
if config_mode == "batch_frames":
self.analysis_mode = "batch"
elif config_mode == "frame_by_frame":
self.analysis_mode = "sequential"
elif config_mode == "auto":
self.analysis_mode = "auto"
else:
self.logger.warning(f"无效的分析模式: {config_mode}使用默认的auto模式")
self.analysis_mode = "auto"
self.frame_analysis_delay = 0.3 # API调用间隔
self.frame_interval = 1.0 # 抽帧时间间隔(秒)
self.batch_size = 3 # 批处理时每批处理的帧数
self.timeout = 60.0 # 分析超时时间(秒)
self.logger.info("✅ 从配置文件读取视频分析参数")
except AttributeError as e:
# 如果配置不存在,使用代码中的默认值
self.logger.warning(f"配置文件中缺少video_analysis配置({e}),使用默认值")
self.max_frames = 6
self.frame_quality = 85
self.max_image_size = 600
self.analysis_mode = "auto"
self.frame_analysis_delay = 0.3
self.frame_interval = 1.0 # 抽帧时间间隔(秒)
self.batch_size = 3 # 批处理时每批处理的帧数
self.timeout = 60.0 # 分析超时时间(秒)
self.enable_frame_timing = True
self.batch_analysis_prompt = """请分析这个视频的内容。这些图片是从视频中按时间顺序提取的关键帧。
请提供详细的分析,包括:
1. 视频的整体内容和主题
2. 主要人物、对象和场景描述
3. 动作、情节和时间线发展
4. 视觉风格和艺术特点
5. 整体氛围和情感表达
6. 任何特殊的视觉效果或文字内容
请用中文回答,分析要详细准确。"""
# 系统提示词
self.system_prompt = "你是一个专业的视频内容分析助手。请仔细观察用户提供的视频关键帧,详细描述视频内容。"
self.logger.info(f"✅ 视频分析器初始化完成,分析模式: {self.analysis_mode}")
def _calculate_video_hash(self, video_data: bytes) -> str:
"""计算视频文件的hash值"""
hash_obj = hashlib.sha256()
hash_obj.update(video_data)
return hash_obj.hexdigest()
def _check_video_exists(self, video_hash: str) -> Optional[Videos]:
"""检查视频是否已经分析过"""
try:
with get_db_session() as session:
return session.query(Videos).filter(Videos.video_hash == video_hash).first()
except Exception as e:
self.logger.warning(f"检查视频是否存在时出错: {e}")
return None
def _check_video_exists_by_features(self, duration: float, frame_count: int, fps: float, tolerance: float = 0.1) -> Optional[Videos]:
"""根据视频特征检查是否已经分析过相似视频"""
try:
with get_db_session() as session:
# 查找具有相似特征的视频
similar_videos = session.query(Videos).filter(
Videos.duration.isnot(None),
Videos.frame_count.isnot(None),
Videos.fps.isnot(None)
).all()
for video in similar_videos:
if (video.duration and video.frame_count and video.fps and
abs(video.duration - duration) <= tolerance and
video.frame_count == frame_count and
abs(video.fps - fps) <= tolerance + 1e-6): # 增加小的epsilon避免浮点数精度问题
self.logger.info(f"根据视频特征找到相似视频: duration={video.duration:.2f}s, frames={video.frame_count}, fps={video.fps:.2f}")
return video
return None
except Exception as e:
self.logger.warning(f"根据特征检查视频时出错: {e}")
return None
def _store_video_result(self, video_hash: str, description: str, path: str = "", metadata: Optional[Dict] = None) -> Optional[Videos]:
"""存储视频分析结果到数据库"""
try:
with get_db_session() as session:
# 如果path为空使用hash作为路径
if not path:
path = f"video_{video_hash[:16]}.unknown"
# 检查是否已经存在相同的video_hash或path
existing_video = session.query(Videos).filter(
(Videos.video_hash == video_hash) | (Videos.path == path)
).first()
if existing_video:
# 如果已存在,更新描述和计数
existing_video.description = description
existing_video.count += 1
existing_video.timestamp = time.time()
if metadata:
existing_video.duration = metadata.get('duration')
existing_video.frame_count = metadata.get('frame_count')
existing_video.fps = metadata.get('fps')
existing_video.resolution = metadata.get('resolution')
existing_video.file_size = metadata.get('file_size')
session.commit()
session.refresh(existing_video)
self.logger.info(f"✅ 更新已存在的视频记录hash: {video_hash[:16]}..., count: {existing_video.count}")
return existing_video
else:
# 如果不存在,创建新记录
video_record = Videos(
video_hash=video_hash,
description=description,
path=path,
timestamp=time.time(),
count=1
)
if metadata:
video_record.duration = metadata.get('duration')
video_record.frame_count = metadata.get('frame_count')
video_record.fps = metadata.get('fps')
video_record.resolution = metadata.get('resolution')
video_record.file_size = metadata.get('file_size')
session.add(video_record)
session.commit()
session.refresh(video_record)
self.logger.info(f"✅ 新视频分析结果已保存到数据库hash: {video_hash[:16]}...")
return video_record
except Exception as e:
self.logger.error(f"❌ 存储视频分析结果时出错: {e}")
return None
def _update_video_count(self, video_id: int) -> bool:
"""更新视频分析计数
Args:
video_id: 视频记录的ID
Returns:
bool: 更新是否成功
"""
try:
with get_db_session() as session:
video_record = session.query(Videos).filter(Videos.id == video_id).first()
if video_record:
video_record.count += 1
session.commit()
self.logger.info(f"✅ 视频分析计数已更新ID: {video_id}, 新计数: {video_record.count}")
return True
else:
self.logger.warning(f"⚠️ 未找到ID为 {video_id} 的视频记录")
return False
except Exception as e:
self.logger.error(f"❌ 更新视频分析计数时出错: {e}")
return False
def set_analysis_mode(self, mode: str):
"""设置分析模式"""
if mode in ["batch", "sequential", "auto"]:
self.analysis_mode = mode
# self.logger.info(f"分析模式已设置为: {mode}")
else:
self.logger.warning(f"无效的分析模式: {mode}")
async def extract_frames(self, video_path: str) -> List[Tuple[str, float]]:
"""提取视频帧"""
frames = []
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps if fps > 0 else 0
self.logger.info(f"视频信息: {total_frames}帧, {fps:.2f}FPS, {duration:.2f}")
# 动态计算帧间隔
if duration > 0:
frame_interval = max(1, int(duration / self.max_frames * fps))
else:
frame_interval = 30 # 默认间隔
frame_count = 0
extracted_count = 0
while cap.isOpened() and extracted_count < self.max_frames:
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
# 转换为PIL图像并压缩
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(frame_rgb)
# 调整图像大小
if max(pil_image.size) > self.max_image_size:
ratio = self.max_image_size / max(pil_image.size)
new_size = tuple(int(dim * ratio) for dim in pil_image.size)
pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS)
# 转换为base64
buffer = io.BytesIO()
pil_image.save(buffer, format='JPEG', quality=self.frame_quality)
frame_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
# 计算时间戳
timestamp = frame_count / fps if fps > 0 else 0
frames.append((frame_base64, timestamp))
extracted_count += 1
self.logger.debug(f"提取第{extracted_count}帧 (时间: {timestamp:.2f}s)")
frame_count += 1
cap.release()
self.logger.info(f"✅ 成功提取{len(frames)}")
return frames
async def analyze_frames_batch(self, frames: List[Tuple[str, float]], user_question: str = None) -> str:
"""批量分析所有帧"""
self.logger.info(f"开始批量分析{len(frames)}")
if not frames:
return "❌ 没有可分析的帧"
# 构建提示词
prompt = self.batch_analysis_prompt
if user_question:
prompt += f"\n\n用户问题: {user_question}"
# 添加帧信息到提示词
frame_info = []
for i, (_frame_base64, timestamp) in enumerate(frames):
if self.enable_frame_timing:
frame_info.append(f"{i+1}帧 (时间: {timestamp:.2f}s)")
else:
frame_info.append(f"{i+1}")
prompt += f"\n\n视频包含{len(frames)}帧图像:{', '.join(frame_info)}"
prompt += "\n\n请基于所有提供的帧图像进行综合分析,关注并描述视频的完整内容和故事发展。"
try:
# 尝试使用多图片分析
response = await self._analyze_multiple_frames(frames, prompt)
self.logger.info("✅ 视频识别完成")
return response
except Exception as e:
self.logger.error(f"❌ 视频识别失败: {e}")
# 降级到单帧分析
self.logger.warning("降级到单帧分析模式")
try:
frame_base64, timestamp = frames[0]
fallback_prompt = prompt + f"\n\n注意由于技术限制当前仅显示第1帧 (时间: {timestamp:.2f}s),视频共有{len(frames)}帧。请基于这一帧进行分析。"
response, _ = await self.video_llm.generate_response_for_image(
prompt=fallback_prompt,
image_base64=frame_base64,
image_format="jpeg"
)
self.logger.info("✅ 降级的单帧分析完成")
return response
except Exception as fallback_e:
self.logger.error(f"❌ 降级分析也失败: {fallback_e}")
raise
async def _analyze_multiple_frames(self, frames: List[Tuple[str, float]], prompt: str) -> str:
"""使用多图片分析方法"""
self.logger.info(f"开始构建包含{len(frames)}帧的分析请求")
# 导入MessageBuilder用于构建多图片消息
from src.llm_models.payload_content.message import MessageBuilder, RoleType
from src.llm_models.utils_model import RequestType
# 构建包含多张图片的消息
message_builder = MessageBuilder().set_role(RoleType.User).add_text_content(prompt)
# 添加所有帧图像
for _i, (frame_base64, _timestamp) in enumerate(frames):
message_builder.add_image_content("jpeg", frame_base64)
# self.logger.info(f"已添加第{i+1}帧到分析请求 (时间: {timestamp:.2f}s, 图片大小: {len(frame_base64)} chars)")
message = message_builder.build()
# self.logger.info(f"✅ 多帧消息构建完成,包含{len(frames)}张图片")
# 获取模型信息和客户端
model_info, api_provider, client = self.video_llm._select_model()
# self.logger.info(f"使用模型: {model_info.name} 进行多帧分析")
# 直接执行多图片请求
api_response = await self.video_llm._execute_request(
api_provider=api_provider,
client=client,
request_type=RequestType.RESPONSE,
model_info=model_info,
message_list=[message],
temperature=None,
max_tokens=None
)
self.logger.info(f"视频识别完成,响应长度: {len(api_response.content or '')} ")
return api_response.content or "❌ 未获得响应内容"
async def analyze_frames_sequential(self, frames: List[Tuple[str, float]], user_question: str = None) -> str:
"""逐帧分析并汇总"""
self.logger.info(f"开始逐帧分析{len(frames)}")
frame_analyses = []
for i, (frame_base64, timestamp) in enumerate(frames):
try:
prompt = f"请分析这个视频的第{i+1}"
if self.enable_frame_timing:
prompt += f" (时间: {timestamp:.2f}s)"
prompt += "。描述你看到的内容,包括人物、动作、场景、文字等。"
if user_question:
prompt += f"\n特别关注: {user_question}"
response, _ = await self.video_llm.generate_response_for_image(
prompt=prompt,
image_base64=frame_base64,
image_format="jpeg"
)
frame_analyses.append(f"{i+1}帧 ({timestamp:.2f}s): {response}")
self.logger.debug(f"✅ 第{i+1}帧分析完成")
# API调用间隔
if i < len(frames) - 1:
await asyncio.sleep(self.frame_analysis_delay)
except Exception as e:
self.logger.error(f"❌ 第{i+1}帧分析失败: {e}")
frame_analyses.append(f"{i+1}帧: 分析失败 - {e}")
# 生成汇总
self.logger.info("开始生成汇总分析")
summary_prompt = f"""基于以下各帧的分析结果,请提供一个完整的视频内容总结:
{chr(10).join(frame_analyses)}
请综合所有帧的信息,描述视频的整体内容、故事线、主要元素和特点。"""
if user_question:
summary_prompt += f"\n特别回答用户的问题: {user_question}"
try:
# 使用最后一帧进行汇总分析
if frames:
last_frame_base64, _ = frames[-1]
summary, _ = await self.video_llm.generate_response_for_image(
prompt=summary_prompt,
image_base64=last_frame_base64,
image_format="jpeg"
)
self.logger.info("✅ 逐帧分析和汇总完成")
return summary
else:
return "❌ 没有可用于汇总的帧"
except Exception as e:
self.logger.error(f"❌ 汇总分析失败: {e}")
# 如果汇总失败,返回各帧分析结果
return f"视频逐帧分析结果:\n\n{chr(10).join(frame_analyses)}"
async def analyze_video(self, video_path: str, user_question: str = None) -> str:
"""分析视频的主要方法"""
try:
self.logger.info(f"开始分析视频: {os.path.basename(video_path)}")
# 提取帧
frames = await self.extract_frames(video_path)
if not frames:
return "❌ 无法从视频中提取有效帧"
# 根据模式选择分析方法
if self.analysis_mode == "auto":
# 智能选择少于等于3帧用批量否则用逐帧
mode = "batch" if len(frames) <= 3 else "sequential"
self.logger.info(f"自动选择分析模式: {mode} (基于{len(frames)}帧)")
else:
mode = self.analysis_mode
# 执行分析
if mode == "batch":
result = await self.analyze_frames_batch(frames, user_question)
else: # sequential
result = await self.analyze_frames_sequential(frames, user_question)
self.logger.info("✅ 视频分析完成")
return result
except Exception as e:
error_msg = f"❌ 视频分析失败: {str(e)}"
self.logger.error(error_msg)
return error_msg
async def analyze_video_from_bytes(self, video_bytes: bytes, filename: str = None, user_question: str = None, prompt: str = None) -> Dict[str, str]:
"""从字节数据分析视频
Args:
video_bytes: 视频字节数据
filename: 文件名(可选)
user_question: 用户问题(旧参数名,保持兼容性)
prompt: 提示词(新参数名,与系统调用保持一致)
Returns:
Dict[str, str]: 包含分析结果的字典,格式为 {"summary": "分析结果"}
"""
try:
logger.info("开始从字节数据分析视频")
# 兼容性处理如果传入了prompt参数使用prompt否则使用user_question
question = prompt if prompt is not None else user_question
# 检查视频数据是否有效
if not video_bytes:
return {"summary": "❌ 视频数据为空"}
# 计算视频hash值
video_hash = self._calculate_video_hash(video_bytes)
self.logger.info(f"视频hash: {video_hash[:16]}... (完整长度: {len(video_hash)})")
# 检查数据库中是否已存在该视频的分析结果基于hash
existing_video = self._check_video_exists(video_hash)
if existing_video:
self.logger.info(f"✅ 找到已存在的视频分析结果hash匹配直接返回 (id: {existing_video.id}, count: {existing_video.count})")
return {"summary": existing_video.description}
# hash未匹配但可能是重编码的相同视频进行特征检测
self.logger.info("未找到hash匹配的视频记录检查是否为重编码的相同视频测试功能")
# 创建临时文件以提取视频特征
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_file:
temp_file.write(video_bytes)
temp_path = temp_file.name
try:
# 检查是否存在特征相似的视频
# 首先提取当前视频的特征
import cv2
cap = cv2.VideoCapture(temp_path)
fps = round(cap.get(cv2.CAP_PROP_FPS), 2)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = round(frame_count / fps if fps > 0 else 0, 2)
cap.release()
self.logger.info(f"当前视频特征: 帧数={frame_count}, FPS={fps}, 时长={duration}")
existing_similar_video = self._check_video_exists_by_features(duration, frame_count, fps)
if existing_similar_video:
self.logger.info(f"✅ 找到特征相似的视频分析结果,直接返回 (id: {existing_similar_video.id}, count: {existing_similar_video.count})")
# 更新该视频的计数
self._update_video_count(existing_similar_video.id)
return {"summary": existing_similar_video.description}
self.logger.info("未找到相似视频,开始新的分析")
# 检查临时文件是否创建成功
if not os.path.exists(temp_path):
return {"summary": "❌ 临时文件创建失败"}
# 使用临时文件进行分析
result = await self.analyze_video(temp_path, question)
finally:
# 清理临时文件
if os.path.exists(temp_path):
os.unlink(temp_path)
# 保存分析结果到数据库
metadata = {
"filename": filename,
"file_size": len(video_bytes),
"analysis_timestamp": time.time()
}
self._store_video_result(
video_hash=video_hash,
description=result,
path=filename or "",
metadata=metadata
)
return {"summary": result}
except Exception as e:
error_msg = f"❌ 从字节数据分析视频失败: {str(e)}"
logger.error(error_msg)
return {"summary": error_msg}
def is_supported_video(self, file_path: str) -> bool:
"""检查是否为支持的视频格式"""
supported_formats = {'.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.m4v', '.3gp', '.webm'}
return Path(file_path).suffix.lower() in supported_formats
# 全局实例
_video_analyzer = None
def get_video_analyzer() -> VideoAnalyzer:
"""获取视频分析器实例(单例模式)"""
global _video_analyzer
if _video_analyzer is None:
_video_analyzer = VideoAnalyzer()
return _video_analyzer

8
uv.lock generated
View File

@@ -1978,6 +1978,12 @@ wheels = [
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/0c/29/0348de65b8cc732daa3e33e67806420b2ae89bdce2b04af740289c5c6c8c/loguru-0.7.3-py3-none-any.whl", hash = "sha256:31a33c10c8e1e10422bfd431aeb5d351c7cf7fa671e3c4df004162264b28220c", size = 61595, upload-time = "2024-12-06T11:20:54.538Z" },
]
[[package]]
name = "lunar-python"
version = "1.4.4"
source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" }
sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/fb/c1/93a993095234d51720e0179204c4b153adc11156ba1baa6b69213f4372d0/lunar_python-1.4.4.tar.gz", hash = "sha256:e14a8cc74a12d5628eaa50d33ec4feceeb44a2c44c74890b895a4e4f5ec17457", size = 106007, upload-time = "2025-04-29T08:14:13.847Z" }
[[package]]
name = "lxml"
version = "6.0.0"
@@ -2079,6 +2085,7 @@ dependencies = [
{ name = "json5" },
{ name = "jsonlines" },
{ name = "langfuse" },
{ name = "lunar-python" },
{ name = "lxml" },
{ name = "maim-message" },
{ name = "matplotlib" },
@@ -2157,6 +2164,7 @@ requires-dist = [
{ name = "json5", specifier = ">=0.12.1" },
{ name = "jsonlines", specifier = ">=4.0.0" },
{ name = "langfuse", specifier = "==2.46.2" },
{ name = "lunar-python", specifier = ">=1.4.4" },
{ name = "lxml", specifier = ">=6.0.0" },
{ name = "maim-message", specifier = ">=0.3.8" },
{ name = "matplotlib", specifier = ">=3.10.3" },