fix(db): 增强数据库会话管理的容错性
调整了 `get_db_session` 的行为,当数据库未能成功初始化时,它现在会返回 `None` 并记录错误,而不是抛出异常。这提高了应用在数据库连接不可用时的健壮性,避免了程序因无法获取会话而崩溃。 - `VideoAnalyzer` 已更新,增加了对会话为 `None` 的检查,以安全地跳过数据库读写操作。 - 附带对 `VideoAnalyzer` 和 `LegacyVideoAnalyzer` 进行了重构,将模型选择和API请求执行的逻辑抽象到独立的 `_model_selector` 和 `_executor` 组件中,提升了代码的清晰度和可维护性。
This commit is contained in:
@@ -209,6 +209,9 @@ class VideoAnalyzer:
|
|||||||
"""检查视频是否已经分析过"""
|
"""检查视频是否已经分析过"""
|
||||||
try:
|
try:
|
||||||
async with get_db_session() as session:
|
async with get_db_session() as session:
|
||||||
|
if not session:
|
||||||
|
logger.warning("无法获取数据库会话,跳过视频存在性检查。")
|
||||||
|
return None
|
||||||
# 明确刷新会话以确保看到其他事务的最新提交
|
# 明确刷新会话以确保看到其他事务的最新提交
|
||||||
await session.expire_all()
|
await session.expire_all()
|
||||||
stmt = select(Videos).where(Videos.video_hash == video_hash)
|
stmt = select(Videos).where(Videos.video_hash == video_hash)
|
||||||
@@ -229,6 +232,9 @@ class VideoAnalyzer:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
async with get_db_session() as session:
|
async with get_db_session() as session:
|
||||||
|
if not session:
|
||||||
|
logger.warning("无法获取数据库会话,跳过视频结果存储。")
|
||||||
|
return None
|
||||||
# 只根据video_hash查找
|
# 只根据video_hash查找
|
||||||
stmt = select(Videos).where(Videos.video_hash == video_hash)
|
stmt = select(Videos).where(Videos.video_hash == video_hash)
|
||||||
result = await session.execute(stmt)
|
result = await session.execute(stmt)
|
||||||
@@ -345,19 +351,59 @@ class VideoAnalyzer:
|
|||||||
prompt = self.batch_analysis_prompt.format(
|
prompt = self.batch_analysis_prompt.format(
|
||||||
personality_core=self.personality_core, personality_side=self.personality_side
|
personality_core=self.personality_core, personality_side=self.personality_side
|
||||||
)
|
)
|
||||||
if question:
|
|
||||||
prompt += f"\n用户关注: {question}"
|
if user_question:
|
||||||
desc = [
|
prompt += f"\n\n用户问题: {user_question}"
|
||||||
(f"第{i+1}帧 (时间: {ts:.2f}s)" if self.enable_frame_timing else f"第{i+1}帧")
|
|
||||||
for i, (_b, ts) in enumerate(frames)
|
# 添加帧信息到提示词
|
||||||
]
|
frame_info = []
|
||||||
prompt += "\n帧列表: " + ", ".join(desc)
|
for i, (_frame_base64, timestamp) in enumerate(frames):
|
||||||
mb = MessageBuilder().set_role(RoleType.User).add_text_content(prompt)
|
if self.enable_frame_timing:
|
||||||
for b64, _ in frames:
|
frame_info.append(f"第{i + 1}帧 (时间: {timestamp:.2f}s)")
|
||||||
mb.add_image_content("jpeg", b64)
|
else:
|
||||||
message = mb.build()
|
frame_info.append(f"第{i + 1}帧")
|
||||||
model_info, api_provider, client = self.video_llm._select_model()
|
|
||||||
resp = await self.video_llm._execute_request(
|
prompt += f"\n\n视频包含{len(frames)}帧图像:{', '.join(frame_info)}"
|
||||||
|
prompt += "\n\n请基于所有提供的帧图像进行综合分析,关注并描述视频的完整内容和故事发展。"
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 使用多图片分析
|
||||||
|
response = await self._analyze_multiple_frames(frames, prompt)
|
||||||
|
logger.info("✅ 视频识别完成")
|
||||||
|
return response
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"❌ 视频识别失败: {e}")
|
||||||
|
raise e
|
||||||
|
|
||||||
|
async def _analyze_multiple_frames(self, frames: List[Tuple[str, float]], prompt: str) -> str:
|
||||||
|
"""使用多图片分析方法"""
|
||||||
|
logger.info(f"开始构建包含{len(frames)}帧的分析请求")
|
||||||
|
|
||||||
|
# 导入MessageBuilder用于构建多图片消息
|
||||||
|
from src.llm_models.payload_content.message import MessageBuilder, RoleType
|
||||||
|
from src.llm_models.utils_model import RequestType
|
||||||
|
|
||||||
|
# 构建包含多张图片的消息
|
||||||
|
message_builder = MessageBuilder().set_role(RoleType.User).add_text_content(prompt)
|
||||||
|
|
||||||
|
# 添加所有帧图像
|
||||||
|
for _i, (frame_base64, _timestamp) in enumerate(frames):
|
||||||
|
message_builder.add_image_content("jpeg", frame_base64)
|
||||||
|
# logger.info(f"已添加第{i+1}帧到分析请求 (时间: {timestamp:.2f}s, 图片大小: {len(frame_base64)} chars)")
|
||||||
|
|
||||||
|
message = message_builder.build()
|
||||||
|
# logger.info(f"✅ 多帧消息构建完成,包含{len(frames)}张图片")
|
||||||
|
|
||||||
|
# 获取模型信息和客户端
|
||||||
|
selection_result = self.video_llm._model_selector.select_best_available_model(set(), "response")
|
||||||
|
if not selection_result:
|
||||||
|
raise RuntimeError("无法为视频分析选择可用模型。")
|
||||||
|
model_info, api_provider, client = selection_result
|
||||||
|
# logger.info(f"使用模型: {model_info.name} 进行多帧分析")
|
||||||
|
|
||||||
|
# 直接执行多图片请求
|
||||||
|
api_response = await self.video_llm._executor.execute_request(
|
||||||
api_provider=api_provider,
|
api_provider=api_provider,
|
||||||
client=client,
|
client=client,
|
||||||
request_type=RequestType.RESPONSE,
|
request_type=RequestType.RESPONSE,
|
||||||
|
|||||||
@@ -461,11 +461,14 @@ class LegacyVideoAnalyzer:
|
|||||||
# logger.info(f"✅ 多帧消息构建完成,包含{len(frames)}张图片")
|
# logger.info(f"✅ 多帧消息构建完成,包含{len(frames)}张图片")
|
||||||
|
|
||||||
# 获取模型信息和客户端
|
# 获取模型信息和客户端
|
||||||
model_info, api_provider, client = self.video_llm._select_model()
|
selection_result = self.video_llm._model_selector.select_best_available_model(set(), "response")
|
||||||
|
if not selection_result:
|
||||||
|
raise RuntimeError("无法为视频分析选择可用模型 (legacy)。")
|
||||||
|
model_info, api_provider, client = selection_result
|
||||||
# logger.info(f"使用模型: {model_info.name} 进行多帧分析")
|
# logger.info(f"使用模型: {model_info.name} 进行多帧分析")
|
||||||
|
|
||||||
# 直接执行多图片请求
|
# 直接执行多图片请求
|
||||||
api_response = await self.video_llm._execute_request(
|
api_response = await self.video_llm._executor.execute_request(
|
||||||
api_provider=api_provider,
|
api_provider=api_provider,
|
||||||
client=client,
|
client=client,
|
||||||
request_type=RequestType.RESPONSE,
|
request_type=RequestType.RESPONSE,
|
||||||
|
|||||||
@@ -759,30 +759,38 @@ async def initialize_database():
|
|||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
|
async def get_db_session() -> AsyncGenerator[Optional[AsyncSession], None]:
|
||||||
"""异步数据库会话上下文管理器"""
|
"""
|
||||||
|
异步数据库会话上下文管理器。
|
||||||
|
在初始化失败时会yield None,调用方需要检查会话是否为None。
|
||||||
|
"""
|
||||||
session: Optional[AsyncSession] = None
|
session: Optional[AsyncSession] = None
|
||||||
|
SessionLocal = None
|
||||||
try:
|
try:
|
||||||
engine, SessionLocal = await initialize_database()
|
_, SessionLocal = await initialize_database()
|
||||||
if not SessionLocal:
|
if not SessionLocal:
|
||||||
raise RuntimeError("Database session not initialized")
|
logger.error("数据库会话工厂 (_SessionLocal) 未初始化。")
|
||||||
session = SessionLocal()
|
yield None
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"数据库初始化失败,无法创建会话: {e}")
|
||||||
|
yield None
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
session = SessionLocal()
|
||||||
# 对于 SQLite,在会话开始时设置 PRAGMA
|
# 对于 SQLite,在会话开始时设置 PRAGMA
|
||||||
from src.config.config import global_config
|
from src.config.config import global_config
|
||||||
if global_config.database.database_type == "sqlite":
|
if global_config.database.database_type == "sqlite":
|
||||||
try:
|
await session.execute(text("PRAGMA busy_timeout = 60000"))
|
||||||
await session.execute(text("PRAGMA busy_timeout = 60000"))
|
await session.execute(text("PRAGMA foreign_keys = ON"))
|
||||||
await session.execute(text("PRAGMA foreign_keys = ON"))
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"[SQLite] 设置会话 PRAGMA 失败: {e}")
|
|
||||||
|
|
||||||
yield session
|
yield session
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"数据库会话错误: {e}")
|
logger.error(f"数据库会话期间发生错误: {e}")
|
||||||
if session:
|
if session:
|
||||||
await session.rollback()
|
await session.rollback()
|
||||||
raise
|
raise # 将会话期间的错误重新抛出给调用者
|
||||||
finally:
|
finally:
|
||||||
if session:
|
if session:
|
||||||
await session.close()
|
await session.close()
|
||||||
|
|||||||
Reference in New Issue
Block a user