diff --git a/TODO.md b/TODO.md index c55725b62..afdc43047 100644 --- a/TODO.md +++ b/TODO.md @@ -18,7 +18,7 @@ - [x] 添加表情包情感分析功能 - [x] 添加主动思考配置 - [x] 添加日程管理 -- [ ] 添加MCP SSE支持 +- [x] 添加MCP SSE支持 - [ ] 增加基于GPT-Sovits的多情感语音合成功能(插件形式) - [ ] 增加基于Open Voice的语音合成功能(插件形式) - [x] 对聊天信息的视频增加一个videoid(就像imageid一样) diff --git a/docs/MCP_SSE_USAGE.md b/docs/MCP_SSE_USAGE.md new file mode 100644 index 000000000..70fc9906b --- /dev/null +++ b/docs/MCP_SSE_USAGE.md @@ -0,0 +1,274 @@ +# MCP SSE 客户端使用指南 + +## 简介 + +MCP (Model Context Protocol) SSE (Server-Sent Events) 客户端支持通过SSE协议与MCP兼容的服务器进行通信。该客户端已集成到MoFox Bot的LLM模型客户端系统中。 + +## 功能特性 + +- ✅ 支持SSE流式响应 +- ✅ 支持多轮对话 +- ✅ 支持工具调用(Function Calling) +- ✅ 支持多模态内容(文本+图片) +- ✅ 自动处理中断信号 +- ✅ 完整的Token使用统计 + +## 配置方法 + +### 1. 安装依赖 + +依赖已自动添加到项目中: +```bash +pip install mcp>=0.9.0 sse-starlette>=2.2.1 +``` + +或使用uv: +```bash +uv sync +``` + +### 2. 配置API Provider + +在配置文件中添加MCP SSE provider: + +```python +# 在配置中添加 +api_providers = [ + { + "name": "mcp_provider", + "client_type": "mcp_sse", # 使用MCP SSE客户端 + "base_url": "https://your-mcp-server.com", + "api_key": "your-api-key", + "timeout": 60 + } +] +``` + +### 3. 配置模型 + +```python +models = [ + { + "name": "mcp_model", + "api_provider": "mcp_provider", + "model_identifier": "your-model-name", + "force_stream_mode": True # MCP SSE始终使用流式 + } +] +``` + +## 使用示例 + +### 基础对话 + +```python +from src.llm_models.model_client.base_client import client_registry +from src.llm_models.payload_content.message import Message, MessageBuilder, RoleType +from src.config.api_ada_configs import APIProvider, ModelInfo + +# 获取客户端 +api_provider = APIProvider( + name="mcp_provider", + client_type="mcp_sse", + base_url="https://your-mcp-server.com", + api_key="your-api-key" +) + +client = client_registry.get_client_class_instance(api_provider) + +# 构建消息 +messages = [ + MessageBuilder() + .set_role(RoleType.User) + .add_text_content("你好,请介绍一下你自己") + .build() +] + +# 获取响应 +model_info = ModelInfo( + name="mcp_model", + api_provider="mcp_provider", + model_identifier="your-model-name" +) + +response = await client.get_response( + model_info=model_info, + message_list=messages, + max_tokens=1024, + temperature=0.7 +) + +print(response.content) +``` + +### 使用工具调用 + +```python +from src.llm_models.payload_content.tool_option import ( + ToolOptionBuilder, + ToolParamType +) + +# 定义工具 +tools = [ + ToolOptionBuilder() + .set_name("get_weather") + .set_description("获取指定城市的天气信息") + .add_param( + name="city", + param_type=ToolParamType.STRING, + description="城市名称", + required=True + ) + .build() +] + +# 发送请求 +response = await client.get_response( + model_info=model_info, + message_list=messages, + tool_options=tools, + max_tokens=1024, + temperature=0.7 +) + +# 检查工具调用 +if response.tool_calls: + for tool_call in response.tool_calls: + print(f"调用工具: {tool_call.func_name}") + print(f"参数: {tool_call.args}") +``` + +### 多模态对话 + +```python +import base64 + +# 读取图片并编码 +with open("image.jpg", "rb") as f: + image_data = base64.b64encode(f.read()).decode("utf-8") + +# 构建多模态消息 +messages = [ + MessageBuilder() + .set_role(RoleType.User) + .add_text_content("这张图片里有什么?") + .add_image_content("jpg", image_data) + .build() +] + +response = await client.get_response( + model_info=model_info, + message_list=messages +) +``` + +### 中断处理 + +```python +import asyncio + +# 创建中断事件 +interrupt_flag = asyncio.Event() + +# 在另一个协程中设置中断 +async def interrupt_after_delay(): + await asyncio.sleep(5) + interrupt_flag.set() + +asyncio.create_task(interrupt_after_delay()) + +try: + response = await client.get_response( + model_info=model_info, + message_list=messages, + interrupt_flag=interrupt_flag + ) +except ReqAbortException: + print("请求被中断") +``` + +## MCP协议规范 + +MCP SSE客户端遵循以下协议规范: + +### 请求格式 + +```json +{ + "model": "model-name", + "messages": [ + { + "role": "user", + "content": "message content" + } + ], + "max_tokens": 1024, + "temperature": 0.7, + "stream": true, + "tools": [ + { + "name": "tool_name", + "description": "tool description", + "input_schema": { + "type": "object", + "properties": {...}, + "required": [...] + } + } + ] +} +``` + +### SSE事件类型 + +客户端处理以下SSE事件: + +1. **content_block_start** - 内容块开始 +2. **content_block_delta** - 内容块增量 +3. **content_block_stop** - 内容块结束 +4. **message_delta** - 消息元数据更新 +5. **message_stop** - 消息结束 + +## 限制说明 + +当前MCP SSE客户端的限制: + +- ❌ 不支持嵌入(Embedding)功能 +- ❌ 不支持音频转录功能 +- ✅ 仅支持流式响应(SSE特性) + +## 故障排查 + +### 连接失败 + +检查: +1. base_url是否正确 +2. API key是否有效 +3. 网络连接是否正常 +4. 服务器是否支持SSE协议 + +### 解析错误 + +检查: +1. 服务器返回的SSE格式是否符合MCP规范 +2. 查看日志中的详细错误信息 + +### 工具调用失败 + +检查: +1. 工具定义的schema是否正确 +2. 服务器是否支持工具调用功能 + +## 相关文档 + +- [MCP协议规范](https://github.com/anthropics/mcp) +- [SSE规范](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) +- [MoFox Bot文档](../README.md) + +## 更新日志 + +### v0.8.1 +- ✅ 添加MCP SSE客户端支持 +- ✅ 支持流式响应和工具调用 +- ✅ 支持多模态内容 diff --git a/plugins/bilibli/bilibli_base.py b/plugins/bilibli/bilibli_base.py index c35538dba..e6418f7c7 100644 --- a/plugins/bilibli/bilibli_base.py +++ b/plugins/bilibli/bilibli_base.py @@ -245,7 +245,7 @@ class BilibiliVideoAnalyzer: logger.exception("详细错误信息:") return None - async def analyze_bilibili_video(self, url: str, prompt: str = None) -> dict[str, Any]: + async def analyze_bilibili_video(self, url: str, prompt: str | None = None) -> dict[str, Any]: """分析哔哩哔哩视频并返回详细信息和AI分析结果""" try: logger.info(f"🎬 开始分析哔哩哔哩视频: {url}") diff --git a/pyproject.toml b/pyproject.toml index 17b361c5f..2ad3c5433 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,8 @@ dependencies = [ "aiosqlite>=0.21.0", "inkfox>=0.1.0", "rrjieba>=0.1.13", + "mcp>=0.9.0", + "sse-starlette>=2.2.1", ] [[tool.uv.index]] diff --git a/requirements.txt b/requirements.txt index 86cd7d666..aa05f5b15 100644 --- a/requirements.txt +++ b/requirements.txt @@ -69,4 +69,6 @@ lunar_python fuzzywuzzy python-multipart aiofiles -inkfox \ No newline at end of file +inkfox +mcp +sse-starlette \ No newline at end of file diff --git a/src/chat/utils/utils_video.py b/src/chat/utils/utils_video.py index 6a6fc6245..78ea3a11c 100644 --- a/src/chat/utils/utils_video.py +++ b/src/chat/utils/utils_video.py @@ -1,9 +1,17 @@ #!/usr/bin/env python3 +"""纯 inkfox 视频关键帧分析工具 + +仅依赖 `inkfox.video` 提供的 Rust 扩展能力: + - extract_keyframes_from_video + - get_system_info + +功能: + - 关键帧提取 (base64, timestamp) + - 批量 / 逐帧 LLM 描述 + - 自动模式 (<=3 帧批量,否则逐帧) """ -视频分析器模块 - Rust优化版本 -集成了Rust视频关键帧提取模块,提供高性能的视频分析功能 -支持SIMD优化、多线程处理和智能关键帧检测 -""" + +from __future__ import annotations import asyncio import base64 @@ -13,913 +21,301 @@ import os import tempfile import time from pathlib import Path +from typing import Any -import numpy as np from PIL import Image -from sqlalchemy import select +from sqlalchemy import exc as sa_exc # type: ignore +from sqlalchemy import insert, select, update # type: ignore -from src.common.database.sqlalchemy_models import Videos, get_db_session +from src.common.database.sqlalchemy_models import Videos, get_db_session # type: ignore from src.common.logger import get_logger from src.config.config import global_config, model_config from src.llm_models.utils_model import LLMRequest +# 简易并发控制:同一 hash 只处理一次 +_video_locks: dict[str, asyncio.Lock] = {} +_locks_guard = asyncio.Lock() + logger = get_logger("utils_video") -# Rust模块可用性检测 -RUST_VIDEO_AVAILABLE = False -try: - import rust_video # pyright: ignore[reportMissingImports] - - RUST_VIDEO_AVAILABLE = True - logger.info("✅ Rust 视频处理模块加载成功") -except ImportError as e: - logger.warning(f"⚠️ Rust 视频处理模块加载失败: {e}") - logger.warning("⚠️ 视频识别功能将自动禁用") -except Exception as e: - logger.error(f"❌ 加载Rust模块时发生错误: {e}") - RUST_VIDEO_AVAILABLE = False - -# 全局正在处理的视频哈希集合,用于防止重复处理 -processing_videos = set() -processing_lock = asyncio.Lock() -# 为每个视频hash创建独立的锁和事件 -video_locks = {} -video_events = {} -video_lock_manager = asyncio.Lock() +from inkfox import video class VideoAnalyzer: - """优化的视频分析器类""" + """基于 inkfox 的视频关键帧 + LLM 描述分析器""" - def __init__(self): - """初始化视频分析器""" - # 检查是否有任何可用的视频处理实现 - opencv_available = False + def __init__(self) -> None: + cfg = getattr(global_config, "video_analysis", object()) + self.max_frames: int = getattr(cfg, "max_frames", 20) + self.frame_quality: int = getattr(cfg, "frame_quality", 85) + self.max_image_size: int = getattr(cfg, "max_image_size", 600) + self.enable_frame_timing: bool = getattr(cfg, "enable_frame_timing", True) + self.use_simd: bool = getattr(cfg, "rust_use_simd", True) + self.threads: int = getattr(cfg, "rust_threads", 0) + self.ffmpeg_path: str = getattr(cfg, "ffmpeg_path", "ffmpeg") + self.analysis_mode: str = getattr(cfg, "analysis_mode", "auto") + self.frame_analysis_delay: float = 0.3 + + # 人格与提示模板 try: - import cv2 - - opencv_available = True - except ImportError: - pass - - if not RUST_VIDEO_AVAILABLE and not opencv_available: - logger.error("❌ 没有可用的视频处理实现,视频分析器将被禁用") - self.disabled = True - return - elif not RUST_VIDEO_AVAILABLE: - logger.warning("⚠️ Rust视频处理模块不可用,将使用Python降级实现") - elif not opencv_available: - logger.warning("⚠️ OpenCV不可用,仅支持Rust关键帧模式") - - self.disabled = False - - # 使用专用的视频分析配置 - try: - self.video_llm = LLMRequest( - model_set=model_config.model_task_config.video_analysis, request_type="video_analysis" - ) - logger.debug("✅ 使用video_analysis模型配置") - except (AttributeError, KeyError) as e: - # 如果video_analysis不存在,使用vlm配置 - self.video_llm = LLMRequest(model_set=model_config.model_task_config.vlm, request_type="vlm") - logger.warning(f"video_analysis配置不可用({e}),回退使用vlm配置") - - # 从配置文件读取参数,如果配置不存在则使用默认值 - config = global_config.video_analysis - - # 使用 getattr 统一获取配置参数,如果配置不存在则使用默认值 - self.max_frames = getattr(config, "max_frames", 6) - self.frame_quality = getattr(config, "frame_quality", 85) - self.max_image_size = getattr(config, "max_image_size", 600) - self.enable_frame_timing = getattr(config, "enable_frame_timing", True) - - # Rust模块相关配置 - self.rust_keyframe_threshold = getattr(config, "rust_keyframe_threshold", 2.0) - self.rust_use_simd = getattr(config, "rust_use_simd", True) - self.rust_block_size = getattr(config, "rust_block_size", 8192) - self.rust_threads = getattr(config, "rust_threads", 0) - self.ffmpeg_path = getattr(config, "ffmpeg_path", "ffmpeg") - - # 从personality配置中获取人格信息 - try: - personality_config = global_config.personality - self.personality_core = getattr(personality_config, "personality_core", "是一个积极向上的女大学生") - self.personality_side = getattr( - personality_config, "personality_side", "用一句话或几句话描述人格的侧面特点" - ) - except AttributeError: - # 如果没有personality配置,使用默认值 + persona = global_config.personality + self.personality_core = getattr(persona, "personality_core", "是一个积极向上的女大学生") + self.personality_side = getattr(persona, "personality_side", "用一句话或几句话描述人格的侧面特点") + except Exception: # pragma: no cover self.personality_core = "是一个积极向上的女大学生" self.personality_side = "用一句话或几句话描述人格的侧面特点" self.batch_analysis_prompt = getattr( - config, + cfg, "batch_analysis_prompt", - """请以第一人称的视角来观看这一个视频,你看到的这些是从视频中按时间顺序提取的关键帧。 - -你的核心人设是:{personality_core}。 -你的人格细节是:{personality_side}。 - -请提供详细的视频内容描述,涵盖以下方面: -1. 视频的整体内容和主题 -2. 主要人物、对象和场景描述 -3. 动作、情节和时间线发展 -4. 视觉风格和艺术特点 -5. 整体氛围和情感表达 -6. 任何特殊的视觉效果或文字内容 - -请用中文回答,结果要详细准确。""", + """请以第一人称视角阅读这些按时间顺序提取的关键帧。\n核心:{personality_core}\n人格:{personality_side}\n请详细描述视频(主题/人物与场景/动作与时间线/视觉风格/情绪氛围/特殊元素)。""", ) - # 新增的线程池配置 - self.use_multiprocessing = getattr(config, "use_multiprocessing", True) - self.max_workers = getattr(config, "max_workers", 2) - self.frame_extraction_mode = getattr(config, "frame_extraction_mode", "fixed_number") - self.frame_interval_seconds = getattr(config, "frame_interval_seconds", 2.0) - - # 将配置文件中的模式映射到内部使用的模式名称 - config_mode = getattr(config, "analysis_mode", "auto") - if config_mode == "batch_frames": - self.analysis_mode = "batch" - elif config_mode == "frame_by_frame": - self.analysis_mode = "sequential" - elif config_mode == "auto": - self.analysis_mode = "auto" - else: - logger.warning(f"无效的分析模式: {config_mode},使用默认的auto模式") - self.analysis_mode = "auto" - - self.frame_analysis_delay = 0.3 # API调用间隔(秒) - self.frame_interval = 1.0 # 抽帧时间间隔(秒) - self.batch_size = 3 # 批处理时每批处理的帧数 - self.timeout = 60.0 # 分析超时时间(秒) - - if config: - logger.debug("✅ 从配置文件读取视频分析参数") - else: - logger.warning("配置文件中缺少video_analysis配置,使用默认值") - - # 系统提示词 - self.system_prompt = "你是一个专业的视频内容分析助手。请仔细观察用户提供的视频关键帧,详细描述视频内容。" - - logger.debug(f"✅ 视频分析器初始化完成,分析模式: {self.analysis_mode}, 线程池: {self.use_multiprocessing}") - - # 获取Rust模块系统信息 - self._log_system_info() - - def _log_system_info(self): - """记录系统信息""" - if not RUST_VIDEO_AVAILABLE: - logger.info("⚠️ Rust模块不可用,跳过系统信息获取") - return - try: - system_info = rust_video.get_system_info() - logger.debug(f"🔧 系统信息: 线程数={system_info.get('threads', '未知')}") - - # 记录CPU特性 - features = [] - if system_info.get("avx2_supported"): - features.append("AVX2") - if system_info.get("sse2_supported"): - features.append("SSE2") - if system_info.get("simd_supported"): - features.append("SIMD") - - if features: - logger.debug(f"🚀 CPU特性: {', '.join(features)}") - else: - logger.debug("⚠️ 未检测到SIMD支持") - - logger.debug(f"📦 Rust模块版本: {system_info.get('version', '未知')}") - - except Exception as e: - logger.warning(f"获取系统信息失败: {e}") - - def _calculate_video_hash(self, video_data: bytes) -> str: - """计算视频文件的hash值""" - hash_obj = hashlib.sha256() - hash_obj.update(video_data) - return hash_obj.hexdigest() - - async def _check_video_exists(self, video_hash: str) -> Videos | None: - """检查视频是否已经分析过""" - try: - async with get_db_session() as session: - if not session: - logger.warning("无法获取数据库会话,跳过视频存在性检查。") - return None - # 明确刷新会话以确保看到其他事务的最新提交 - await session.expire_all() - stmt = select(Videos).where(Videos.video_hash == video_hash) - result = await session.execute(stmt) - return result.scalar_one_or_none() - except Exception as e: - logger.warning(f"检查视频是否存在时出错: {e}") - return None - - async def _store_video_result( - self, video_hash: str, description: str, metadata: dict | None = None - ) -> Videos | None: - """存储视频分析结果到数据库""" - # 检查描述是否为错误信息,如果是则不保存 - if description.startswith("❌"): - logger.warning(f"⚠️ 检测到错误信息,不保存到数据库: {description[:50]}...") - return None - - try: - async with get_db_session() as session: - if not session: - logger.warning("无法获取数据库会话,跳过视频结果存储。") - return None - # 只根据video_hash查找 - stmt = select(Videos).where(Videos.video_hash == video_hash) - result = await session.execute(stmt) - existing_video = result.scalar_one_or_none() - - if existing_video: - # 如果已存在,更新描述和计数 - existing_video.description = description - existing_video.count += 1 - existing_video.timestamp = time.time() - if metadata: - existing_video.duration = metadata.get("duration") - existing_video.frame_count = metadata.get("frame_count") - existing_video.fps = metadata.get("fps") - existing_video.resolution = metadata.get("resolution") - existing_video.file_size = metadata.get("file_size") - await session.commit() - await session.refresh(existing_video) - logger.info(f"✅ 更新已存在的视频记录,hash: {video_hash[:16]}..., count: {existing_video.count}") - return existing_video - else: - video_record = Videos( - video_hash=video_hash, description=description, timestamp=time.time(), count=1 - ) - if metadata: - video_record.duration = metadata.get("duration") - video_record.frame_count = metadata.get("frame_count") - video_record.fps = metadata.get("fps") - video_record.resolution = metadata.get("resolution") - video_record.file_size = metadata.get("file_size") - - session.add(video_record) - await session.commit() - await session.refresh(video_record) - logger.info(f"✅ 新视频分析结果已保存到数据库,hash: {video_hash[:16]}...") - return video_record - except Exception as e: - logger.error(f"❌ 存储视频分析结果时出错: {e}") - return None - - def set_analysis_mode(self, mode: str): - """设置分析模式""" - if mode in ["batch", "sequential", "auto"]: - self.analysis_mode = mode - # logger.info(f"分析模式已设置为: {mode}") - else: - logger.warning(f"无效的分析模式: {mode}") - - async def extract_frames(self, video_path: str) -> list[tuple[str, float]]: - """提取视频帧 - 智能选择最佳实现""" - # 检查是否应该使用Rust实现 - if RUST_VIDEO_AVAILABLE and self.frame_extraction_mode == "keyframe": - # 优先尝试Rust关键帧提取 - try: - return await self._extract_frames_rust_advanced(video_path) - except Exception as e: - logger.warning(f"Rust高级接口失败: {e},尝试基础接口") - try: - return await self._extract_frames_rust(video_path) - except Exception as e2: - logger.warning(f"Rust基础接口也失败: {e2},降级到Python实现") - return await self._extract_frames_python_fallback(video_path) - else: - # 使用Python实现(支持time_interval和fixed_number模式) - if not RUST_VIDEO_AVAILABLE: - logger.info("🔄 Rust模块不可用,使用Python抽帧实现") - else: - logger.info(f"🔄 抽帧模式为 {self.frame_extraction_mode},使用Python抽帧实现") - return await self._extract_frames_python_fallback(video_path) - - async def _extract_frames_rust_advanced(self, video_path: str) -> list[tuple[str, float]]: - """使用 Rust 高级接口的帧提取""" - try: - logger.info("🔄 使用 Rust 高级接口提取关键帧...") - - # 创建 Rust 视频处理器,使用配置参数 - extractor = rust_video.VideoKeyframeExtractor( - ffmpeg_path=self.ffmpeg_path, - threads=self.rust_threads, - verbose=False, # 使用固定值,不需要配置 + self.video_llm = LLMRequest( + model_set=model_config.model_task_config.video_analysis, request_type="video_analysis" ) + except Exception: + self.video_llm = LLMRequest(model_set=model_config.model_task_config.vlm, request_type="vlm") - # 1. 提取所有帧 - frames_data, width, height = extractor.extract_frames( + self._log_system() + + # ---- 系统信息 ---- + def _log_system(self) -> None: + try: + info = video.get_system_info() # type: ignore[attr-defined] + logger.info( + f"inkfox: threads={info.get('threads')} version={info.get('version')} simd={info.get('simd_supported')}" + ) + except Exception as e: # pragma: no cover + logger.debug(f"获取系统信息失败: {e}") + + # ---- 关键帧提取 ---- + async def extract_keyframes(self, video_path: str) -> list[tuple[str, float]]: + """提取关键帧并返回 (base64, timestamp_seconds) 列表""" + with tempfile.TemporaryDirectory() as tmp: + result = video.extract_keyframes_from_video( # type: ignore[attr-defined] video_path=video_path, - max_frames=self.max_frames * 3, # 提取更多帧用于关键帧检测 + output_dir=tmp, + max_keyframes=self.max_frames * 2, # 先多抓一点再截断 + max_save=self.max_frames, + ffmpeg_path=self.ffmpeg_path, + use_simd=self.use_simd, + threads=self.threads, + verbose=False, ) - - logger.info(f"提取到 {len(frames_data)} 帧,视频尺寸: {width}x{height}") - - # 2. 检测关键帧,使用配置参数 - keyframe_indices = extractor.extract_keyframes( - frames=frames_data, - threshold=self.rust_keyframe_threshold, - use_simd=self.rust_use_simd, - block_size=self.rust_block_size, - ) - - logger.info(f"检测到 {len(keyframe_indices)} 个关键帧") - - # 3. 转换选定的关键帧为 base64 - frames = [] - frame_count = 0 - - for idx in keyframe_indices[: self.max_frames]: - if idx < len(frames_data): - try: - frame = frames_data[idx] - frame_data = frame.get_data() - - # 将灰度数据转换为PIL图像 - frame_array = np.frombuffer(frame_data, dtype=np.uint8).reshape((frame.height, frame.width)) - pil_image = Image.fromarray( - frame_array, - mode="L", # 灰度模式 - ) - - # 转换为RGB模式以便保存为JPEG - pil_image = pil_image.convert("RGB") - - # 调整图像大小 - if max(pil_image.size) > self.max_image_size: - ratio = self.max_image_size / max(pil_image.size) - new_size = tuple(int(dim * ratio) for dim in pil_image.size) - pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS) - - # 转换为 base64 - buffer = io.BytesIO() - pil_image.save(buffer, format="JPEG", quality=self.frame_quality) - frame_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") - - # 估算时间戳 - estimated_timestamp = frame.frame_number * (1.0 / 30.0) # 假设30fps - - frames.append((frame_base64, estimated_timestamp)) - frame_count += 1 - - logger.debug( - f"处理关键帧 {frame_count}: 帧号 {frame.frame_number}, 时间 {estimated_timestamp:.2f}s" - ) - - except Exception as e: - logger.error(f"处理关键帧 {idx} 失败: {e}") - continue - - logger.info(f"✅ Rust 高级提取完成: {len(frames)} 关键帧") + files = sorted(Path(tmp).glob("keyframe_*.jpg"))[: self.max_frames] + total_ms = getattr(result, "total_time_ms", 0) + frames: list[tuple[str, float]] = [] + for i, f in enumerate(files): + img = Image.open(f).convert("RGB") + if max(img.size) > self.max_image_size: + scale = self.max_image_size / max(img.size) + img = img.resize((int(img.width * scale), int(img.height * scale)), Image.Resampling.LANCZOS) + buf = io.BytesIO() + img.save(buf, format="JPEG", quality=self.frame_quality) + b64 = base64.b64encode(buf.getvalue()).decode() + ts = (i / max(1, len(files) - 1)) * (total_ms / 1000.0) if total_ms else float(i) + frames.append((b64, ts)) return frames - except Exception as e: - logger.error(f"❌ Rust 高级帧提取失败: {e}") - # 回退到基础方法 - logger.info("回退到基础 Rust 方法") - return await self._extract_frames_rust(video_path) + # ---- 批量分析 ---- + async def _analyze_batch(self, frames: list[tuple[str, float]], question: str | None) -> str: + from src.llm_models.payload_content.message import MessageBuilder + from src.llm_models.utils_model import RequestType - async def _extract_frames_rust(self, video_path: str) -> list[tuple[str, float]]: - """使用 Rust 实现的帧提取""" - try: - logger.info("🔄 使用 Rust 模块提取关键帧...") - - # 创建临时输出目录 - with tempfile.TemporaryDirectory() as temp_dir: - # 使用便捷函数进行关键帧提取,使用配置参数 - result = rust_video.extract_keyframes_from_video( - video_path=video_path, - output_dir=temp_dir, - threshold=self.rust_keyframe_threshold, - max_frames=self.max_frames * 2, # 提取更多帧以便筛选 - max_save=self.max_frames, - ffmpeg_path=self.ffmpeg_path, - use_simd=self.rust_use_simd, - threads=self.rust_threads, - verbose=False, # 使用固定值,不需要配置 - ) - - logger.info( - f"Rust 处理完成: 总帧数 {result.total_frames}, 关键帧 {result.keyframes_extracted}, 处理速度 {result.processing_fps:.1f} FPS" - ) - - # 转换保存的关键帧为 base64 格式 - frames = [] - temp_dir_path = Path(temp_dir) - - # 获取所有保存的关键帧文件 - keyframe_files = sorted(temp_dir_path.glob("keyframe_*.jpg")) - - for i, keyframe_file in enumerate(keyframe_files): - if len(frames) >= self.max_frames: - break - - try: - # 读取关键帧文件 - with open(keyframe_file, "rb") as f: - image_data = f.read() - - # 转换为 PIL 图像并压缩 - pil_image = Image.open(io.BytesIO(image_data)) - - # 调整图像大小 - if max(pil_image.size) > self.max_image_size: - ratio = self.max_image_size / max(pil_image.size) - new_size = tuple(int(dim * ratio) for dim in pil_image.size) - pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS) - - # 转换为 base64 - buffer = io.BytesIO() - pil_image.save(buffer, format="JPEG", quality=self.frame_quality) - frame_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") - - # 估算时间戳(基于帧索引和总时长) - if result.total_frames > 0: - # 假设关键帧在时间上均匀分布 - estimated_timestamp = (i * result.total_time_ms / 1000.0) / result.keyframes_extracted - else: - estimated_timestamp = i * 1.0 # 默认每秒一帧 - - frames.append((frame_base64, estimated_timestamp)) - - logger.debug(f"处理关键帧 {i + 1}: 估算时间 {estimated_timestamp:.2f}s") - - except Exception as e: - logger.error(f"处理关键帧 {keyframe_file.name} 失败: {e}") - continue - - logger.info(f"✅ Rust 提取完成: {len(frames)} 关键帧") - return frames - - except Exception as e: - logger.error(f"❌ Rust 帧提取失败: {e}") - raise e - - async def _extract_frames_python_fallback(self, video_path: str) -> list[tuple[str, float]]: - """Python降级抽帧实现 - 支持多种抽帧模式""" - try: - # 导入旧版本分析器 - from .utils_video_legacy import get_legacy_video_analyzer - - logger.info("🔄 使用Python降级抽帧实现...") - legacy_analyzer = get_legacy_video_analyzer() - - # 同步配置参数 - legacy_analyzer.max_frames = self.max_frames - legacy_analyzer.frame_quality = self.frame_quality - legacy_analyzer.max_image_size = self.max_image_size - legacy_analyzer.frame_extraction_mode = self.frame_extraction_mode - legacy_analyzer.frame_interval_seconds = self.frame_interval_seconds - legacy_analyzer.use_multiprocessing = self.use_multiprocessing - - # 使用旧版本的抽帧功能 - frames = await legacy_analyzer.extract_frames(video_path) - - logger.info(f"✅ Python降级抽帧完成: {len(frames)} 帧") - return frames - - except Exception as e: - logger.error(f"❌ Python降级抽帧失败: {e}") - return [] - - async def analyze_frames_batch(self, frames: list[tuple[str, float]], user_question: str = None) -> str: - """批量分析所有帧""" - logger.info(f"开始批量分析{len(frames)}帧") - - if not frames: - return "❌ 没有可分析的帧" - - # 构建提示词并格式化人格信息,要不然占位符的那个会爆炸 prompt = self.batch_analysis_prompt.format( personality_core=self.personality_core, personality_side=self.personality_side ) + if question: + prompt += f"\n用户关注: {question}" - if user_question: - prompt += f"\n\n用户问题: {user_question}" + desc = [ + (f"第{i+1}帧 (时间: {ts:.2f}s)" if self.enable_frame_timing else f"第{i+1}帧") + for i, (_b, ts) in enumerate(frames) + ] + prompt += "\n帧列表: " + ", ".join(desc) - # 添加帧信息到提示词 - frame_info = [] - for i, (_frame_base64, timestamp) in enumerate(frames): - if self.enable_frame_timing: - frame_info.append(f"第{i + 1}帧 (时间: {timestamp:.2f}s)") - else: - frame_info.append(f"第{i + 1}帧") + message_builder = MessageBuilder().add_text_content(prompt) + for b64, _ in frames: + message_builder.add_image_content(image_format="jpeg", image_base64=b64) + messages = [message_builder.build()] - prompt += f"\n\n视频包含{len(frames)}帧图像:{', '.join(frame_info)}" - prompt += "\n\n请基于所有提供的帧图像进行综合分析,关注并描述视频的完整内容和故事发展。" - - try: - # 使用多图片分析 - response = await self._analyze_multiple_frames(frames, prompt) - logger.info("✅ 视频识别完成") - return response - - except Exception as e: - logger.error(f"❌ 视频识别失败: {e}") - raise e - - async def _analyze_multiple_frames(self, frames: list[tuple[str, float]], prompt: str) -> str: - """使用多图片分析方法""" - logger.info(f"开始构建包含{len(frames)}帧的分析请求") - - # 导入MessageBuilder用于构建多图片消息 - from src.llm_models.payload_content.message import MessageBuilder, RoleType - from src.llm_models.utils_model import RequestType - - # 构建包含多张图片的消息 - message_builder = MessageBuilder().set_role(RoleType.User).add_text_content(prompt) - - # 添加所有帧图像 - for _i, (frame_base64, _timestamp) in enumerate(frames): - message_builder.add_image_content("jpeg", frame_base64) - # logger.info(f"已添加第{i+1}帧到分析请求 (时间: {timestamp:.2f}s, 图片大小: {len(frame_base64)} chars)") - - message = message_builder.build() - # logger.info(f"✅ 多帧消息构建完成,包含{len(frames)}张图片") - - # 获取模型信息和客户端 - selection_result = self.video_llm._model_selector.select_best_available_model(set(), "response") - if not selection_result: - raise RuntimeError("无法为视频分析选择可用模型。") - model_info, api_provider, client = selection_result - # logger.info(f"使用模型: {model_info.name} 进行多帧分析") - - # 直接执行多图片请求 - api_response = await self.video_llm._executor.execute_request( - api_provider=api_provider, - client=client, - request_type=RequestType.RESPONSE, - model_info=model_info, - message_list=[message], - temperature=None, - max_tokens=None, + # 使用封装好的高级策略执行请求,而不是直接调用内部方法 + response, _ = await self.video_llm._strategy.execute_with_failover( + RequestType.RESPONSE, + raise_when_empty=False, # 即使失败也返回默认值,避免程序崩溃 + message_list=messages, + temperature=self.video_llm.model_for_task.temperature, + max_tokens=self.video_llm.model_for_task.max_tokens, ) - logger.info(f"视频识别完成,响应长度: {len(api_response.content or '')} ") - return api_response.content or "❌ 未获得响应内容" + return response.content or "❌ 未获得响应" - async def analyze_frames_sequential(self, frames: list[tuple[str, float]], user_question: str = None) -> str: - """逐帧分析并汇总""" - logger.info(f"开始逐帧分析{len(frames)}帧") - - frame_analyses = [] - - for i, (frame_base64, timestamp) in enumerate(frames): + # ---- 逐帧分析 ---- + async def _analyze_sequential(self, frames: list[tuple[str, float]], question: str | None) -> str: + results: list[str] = [] + for i, (b64, ts) in enumerate(frames): + prompt = f"分析第{i+1}帧" + (f" (时间: {ts:.2f}s)" if self.enable_frame_timing else "") + if question: + prompt += f"\n关注: {question}" try: - prompt = f"请分析这个视频的第{i + 1}帧" - if self.enable_frame_timing: - prompt += f" (时间: {timestamp:.2f}s)" - prompt += "。描述你看到的内容,包括人物、动作、场景、文字等。" - - if user_question: - prompt += f"\n特别关注: {user_question}" - - response, _ = await self.video_llm.generate_response_for_image( - prompt=prompt, image_base64=frame_base64, image_format="jpeg" + text, _ = await self.video_llm.generate_response_for_image( + prompt=prompt, image_base64=b64, image_format="jpeg" ) - - frame_analyses.append(f"第{i + 1}帧 ({timestamp:.2f}s): {response}") - logger.debug(f"✅ 第{i + 1}帧分析完成") - - # API调用间隔 - if i < len(frames) - 1: - await asyncio.sleep(self.frame_analysis_delay) - - except Exception as e: - logger.error(f"❌ 第{i + 1}帧分析失败: {e}") - frame_analyses.append(f"第{i + 1}帧: 分析失败 - {e}") - - # 生成汇总 - logger.info("开始生成汇总分析") - summary_prompt = f"""基于以下各帧的分析结果,请提供一个完整的视频内容总结: - -{chr(10).join(frame_analyses)} - -请综合所有帧的信息,描述视频的整体内容、故事线、主要元素和特点。""" - - if user_question: - summary_prompt += f"\n特别回答用户的问题: {user_question}" - + results.append(f"第{i+1}帧: {text}") + except Exception as e: # pragma: no cover + results.append(f"第{i+1}帧: 失败 {e}") + if i < len(frames) - 1: + await asyncio.sleep(self.frame_analysis_delay) + summary_prompt = "基于以下逐帧结果给出完整总结:\n\n" + "\n".join(results) try: - # 使用最后一帧进行汇总分析 - if frames: - last_frame_base64, _ = frames[-1] - summary, _ = await self.video_llm.generate_response_for_image( - prompt=summary_prompt, image_base64=last_frame_base64, image_format="jpeg" - ) - logger.info("✅ 逐帧分析和汇总完成") - return summary - else: - return "❌ 没有可用于汇总的帧" - except Exception as e: - logger.error(f"❌ 汇总分析失败: {e}") - # 如果汇总失败,返回各帧分析结果 - return f"视频逐帧分析结果:\n\n{chr(10).join(frame_analyses)}" + final, _ = await self.video_llm.generate_response_for_image( + prompt=summary_prompt, image_base64=frames[-1][0], image_format="jpeg" + ) + return final + except Exception: # pragma: no cover + return "\n".join(results) - async def analyze_video(self, video_path: str, user_question: str = None) -> tuple[bool, str]: - """分析视频的主要方法 - - Returns: - Tuple[bool, str]: (是否成功, 分析结果或错误信息) - """ - if self.disabled: - error_msg = "❌ 视频分析功能已禁用:没有可用的视频处理实现" - logger.warning(error_msg) - return (False, error_msg) - - try: - logger.info(f"开始分析视频: {os.path.basename(video_path)}") - - # 提取帧 - frames = await self.extract_frames(video_path) - if not frames: - error_msg = "❌ 无法从视频中提取有效帧" - return (False, error_msg) - - # 根据模式选择分析方法 - if self.analysis_mode == "auto": - # 智能选择:少于等于3帧用批量,否则用逐帧 - mode = "batch" if len(frames) <= 3 else "sequential" - logger.info(f"自动选择分析模式: {mode} (基于{len(frames)}帧)") - else: - mode = self.analysis_mode - - # 执行分析 - if mode == "batch": - result = await self.analyze_frames_batch(frames, user_question) - else: # sequential - result = await self.analyze_frames_sequential(frames, user_question) - - logger.info("✅ 视频分析完成") - return (True, result) - - except Exception as e: - error_msg = f"❌ 视频分析失败: {e!s}" - logger.error(error_msg) - return (False, error_msg) + # ---- 主入口 ---- + async def analyze_video(self, video_path: str, question: str | None = None) -> tuple[bool, str]: + if not os.path.exists(video_path): + return False, "❌ 文件不存在" + frames = await self.extract_keyframes(video_path) + if not frames: + return False, "❌ 未提取到关键帧" + mode = self.analysis_mode + if mode == "auto": + mode = "batch" if len(frames) <= 20 else "sequential" + text = await (self._analyze_batch(frames, question) if mode == "batch" else self._analyze_sequential(frames, question)) + return True, text async def analyze_video_from_bytes( - self, video_bytes: bytes, filename: str = None, user_question: str = None, prompt: str = None + self, + video_bytes: bytes, + filename: str | None = None, + prompt: str | None = None, + question: str | None = None, ) -> dict[str, str]: - """从字节数据分析视频 + """从内存字节分析视频,兼容旧调用 (prompt / question 二选一) 返回 {"summary": str}.""" + if not video_bytes: + return {"summary": "❌ 空视频数据"} + # 兼容参数:prompt 优先,其次 question + q = prompt if prompt is not None else question + video_hash = hashlib.sha256(video_bytes).hexdigest() - Args: - video_bytes: 视频字节数据 - filename: 文件名(可选,仅用于日志) - user_question: 用户问题(旧参数名,保持兼容性) - prompt: 提示词(新参数名,与系统调用保持一致) + # 查缓存(第一次,未加锁) + cached = await self._get_cached(video_hash) + if cached: + logger.info(f"视频缓存命中(预检查) hash={video_hash[:16]}") + return {"summary": cached} - Returns: - Dict[str, str]: 包含分析结果的字典,格式为 {"summary": "分析结果"} - """ - if self.disabled: - return {"summary": "❌ 视频分析功能已禁用:没有可用的视频处理实现"} + # 获取锁避免重复处理 + async with _locks_guard: + lock = _video_locks.get(video_hash) + if lock is None: + lock = asyncio.Lock() + _video_locks[video_hash] = lock + async with lock: + # 双检缓存 + cached2 = await self._get_cached(video_hash) + if cached2: + logger.info(f"视频缓存命中(锁后) hash={video_hash[:16]}") + return {"summary": cached2} - video_hash = None - video_event = None - - try: - logger.info("开始从字节数据分析视频") - - # 兼容性处理:如果传入了prompt参数,使用prompt;否则使用user_question - question = prompt if prompt is not None else user_question - - # 检查视频数据是否有效 - if not video_bytes: - return {"summary": "❌ 视频数据为空"} - - # 计算视频hash值 - video_hash = self._calculate_video_hash(video_bytes) - logger.info(f"视频hash: {video_hash}") - - # 改进的并发控制:使用每个视频独立的锁和事件 - async with video_lock_manager: - if video_hash not in video_locks: - video_locks[video_hash] = asyncio.Lock() - video_events[video_hash] = asyncio.Event() - - video_lock = video_locks[video_hash] - video_event = video_events[video_hash] - - # 尝试获取该视频的专用锁 - if video_lock.locked(): - logger.info(f"⏳ 相同视频正在处理中,等待处理完成... (hash: {video_hash[:16]}...)") - try: - # 等待处理完成的事件信号,最多等待60秒 - await asyncio.wait_for(video_event.wait(), timeout=60.0) - logger.info("✅ 等待结束,检查是否有处理结果") - - # 检查是否有结果了 - existing_video = await self._check_video_exists(video_hash) - if existing_video: - logger.info(f"✅ 找到了处理结果,直接返回 (id: {existing_video.id})") - return {"summary": existing_video.description} - else: - logger.warning("⚠️ 等待完成但未找到结果,可能处理失败") - except asyncio.TimeoutError: - logger.warning("⚠️ 等待超时(60秒),放弃等待") - - # 获取锁开始处理 - async with video_lock: - logger.info(f"🔒 获得视频处理锁,开始处理 (hash: {video_hash[:16]}...)") - - # 再次检查数据库(可能在等待期间已经有结果了) - existing_video = await self._check_video_exists(video_hash) - if existing_video: - logger.info(f"✅ 获得锁后发现已有结果,直接返回 (id: {existing_video.id})") - video_event.set() # 通知其他等待者 - return {"summary": existing_video.description} - - # 未找到已存在记录,开始新的分析 - logger.info("未找到已存在的视频记录,开始新的分析") - - # 创建临时文件进行分析 - with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: - temp_file.write(video_bytes) - temp_path = temp_file.name - - try: - # 检查临时文件是否创建成功 - if not os.path.exists(temp_path): - video_event.set() # 通知等待者 - return {"summary": "❌ 临时文件创建失败"} - - # 使用临时文件进行分析 - success, result = await self.analyze_video(temp_path, question) - - finally: - # 清理临时文件 - if os.path.exists(temp_path): - os.unlink(temp_path) - - # 保存分析结果到数据库(仅保存成功的结果) - if success and not result.startswith("❌"): - metadata = {"filename": filename, "file_size": len(video_bytes), "analysis_timestamp": time.time()} - await self._store_video_result(video_hash=video_hash, description=result, metadata=metadata) - logger.info("✅ 分析结果已保存到数据库") - else: - logger.warning("⚠️ 分析失败,不保存到数据库以便后续重试") - - # 处理完成,通知等待者并清理资源 - video_event.set() - async with video_lock_manager: - # 清理资源 - video_locks.pop(video_hash, None) - video_events.pop(video_hash, None) - - return {"summary": result} - - except Exception as e: - error_msg = f"❌ 从字节数据分析视频失败: {e!s}" - logger.error(error_msg) - - # 不保存错误信息到数据库,允许后续重试 - logger.info("💡 错误信息不保存到数据库,允许后续重试") - - # 处理失败,通知等待者并清理资源 try: - if video_hash and video_event: - async with video_lock_manager: - if video_hash in video_events: - video_events[video_hash].set() - video_locks.pop(video_hash, None) - video_events.pop(video_hash, None) - except Exception as cleanup_e: - logger.error(f"❌ 清理锁资源失败: {cleanup_e}") - - return {"summary": error_msg} - - def is_supported_video(self, file_path: str) -> bool: - """检查是否为支持的视频格式""" - supported_formats = {".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".m4v", ".3gp", ".webm"} - return Path(file_path).suffix.lower() in supported_formats - - def get_processing_capabilities(self) -> dict[str, any]: - """获取处理能力信息""" - if not RUST_VIDEO_AVAILABLE: - return {"error": "Rust视频处理模块不可用", "available": False, "reason": "rust_video模块未安装或加载失败"} + with tempfile.NamedTemporaryFile(delete=False) as fp: + fp.write(video_bytes) + temp_path = fp.name + try: + ok, summary = await self.analyze_video(temp_path, q) + # 写入缓存(仅成功) + if ok: + await self._save_cache(video_hash, summary, len(video_bytes)) + return {"summary": summary} + finally: + if os.path.exists(temp_path): + try: + os.remove(temp_path) + except Exception: # pragma: no cover + pass + except Exception as e: # pragma: no cover + return {"summary": f"❌ 处理失败: {e}"} + # ---- 缓存辅助 ---- + async def _get_cached(self, video_hash: str) -> str | None: try: - system_info = rust_video.get_system_info() + async with get_db_session() as session: # type: ignore + result = await session.execute(select(Videos).where(Videos.video_hash == video_hash)) # type: ignore + obj: Videos | None = result.scalar_one_or_none() # type: ignore + if obj and obj.vlm_processed and obj.description: + # 更新使用次数 + try: + await session.execute( + update(Videos) + .where(Videos.id == obj.id) # type: ignore + .values(count=obj.count + 1 if obj.count is not None else 1) + ) + await session.commit() + except Exception: # pragma: no cover + await session.rollback() + return obj.description + except Exception: # pragma: no cover + pass + return None - # 创建一个临时的extractor来获取CPU特性 - extractor = rust_video.VideoKeyframeExtractor(threads=0, verbose=False) - cpu_features = extractor.get_cpu_features() - - capabilities = { - "system": { - "threads": system_info.get("threads", 0), - "rust_version": system_info.get("version", "unknown"), - }, - "cpu_features": cpu_features, - "recommended_settings": self._get_recommended_settings(cpu_features), - "analysis_modes": ["auto", "batch", "sequential"], - "supported_formats": [".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".m4v", ".3gp", ".webm"], - "available": True, - } - - return capabilities - - except Exception as e: - logger.error(f"获取处理能力信息失败: {e}") - return {"error": str(e), "available": False} - - def _get_recommended_settings(self, cpu_features: dict[str, bool]) -> dict[str, any]: - """根据CPU特性推荐最佳设置""" - settings = { - "use_simd": any(cpu_features.values()), - "block_size": 8192, - "threads": 0, # 自动检测 - } - - # 根据CPU特性调整设置 - if cpu_features.get("avx2", False): - settings["block_size"] = 16384 # AVX2支持更大的块 - settings["optimization_level"] = "avx2" - elif cpu_features.get("sse2", False): - settings["block_size"] = 8192 - settings["optimization_level"] = "sse2" - else: - settings["use_simd"] = False - settings["block_size"] = 4096 - settings["optimization_level"] = "scalar" - - return settings + async def _save_cache(self, video_hash: str, summary: str, file_size: int) -> None: + try: + async with get_db_session() as session: # type: ignore + stmt = insert(Videos).values( # type: ignore + video_id="", + video_hash=video_hash, + description=summary, + count=1, + timestamp=time.time(), + vlm_processed=True, + duration=None, + frame_count=None, + fps=None, + resolution=None, + file_size=file_size, + ) + try: + await session.execute(stmt) + await session.commit() + logger.debug(f"视频缓存写入 success hash={video_hash}") + except sa_exc.IntegrityError: # 可能并发已写入 + await session.rollback() + logger.debug(f"视频缓存已存在 hash={video_hash}") + except Exception: # pragma: no cover + logger.debug("视频缓存写入失败") -# 全局实例 -_video_analyzer = None +# ---- 外部接口 ---- +_INSTANCE: VideoAnalyzer | None = None def get_video_analyzer() -> VideoAnalyzer: - """获取视频分析器实例(单例模式)""" - global _video_analyzer - if _video_analyzer is None: - _video_analyzer = VideoAnalyzer() - return _video_analyzer + global _INSTANCE + if _INSTANCE is None: + _INSTANCE = VideoAnalyzer() + return _INSTANCE def is_video_analysis_available() -> bool: - """检查视频分析功能是否可用 + return True - Returns: - bool: 如果有任何可用的视频处理实现则返回True - """ - # 现在即使Rust模块不可用,也可以使用Python降级实现 + +def get_video_analysis_status() -> dict[str, Any]: try: - import cv2 - - return True - except ImportError: - return False - - -def get_video_analysis_status() -> dict[str, any]: - """获取视频分析功能的详细状态信息 - - Returns: - Dict[str, any]: 包含功能状态信息的字典 - """ - # 检查OpenCV是否可用 - opencv_available = False - try: - import cv2 - - opencv_available = True - except ImportError: - pass - - status = { - "available": opencv_available or RUST_VIDEO_AVAILABLE, - "implementations": { - "rust_keyframe": { - "available": RUST_VIDEO_AVAILABLE, - "description": "Rust智能关键帧提取", - "supported_modes": ["keyframe"], - }, - "python_legacy": { - "available": opencv_available, - "description": "Python传统抽帧方法", - "supported_modes": ["fixed_number", "time_interval"], - }, - }, - "supported_modes": [], + info = video.get_system_info() # type: ignore[attr-defined] + except Exception as e: # pragma: no cover + return {"available": False, "error": str(e)} + inst = get_video_analyzer() + return { + "available": True, + "system": info, + "modes": ["auto", "batch", "sequential"], + "max_frames_default": inst.max_frames, + "implementation": "inkfox", } - - # 汇总支持的模式 - if RUST_VIDEO_AVAILABLE: - status["supported_modes"].extend(["keyframe"]) - if opencv_available: - status["supported_modes"].extend(["fixed_number", "time_interval"]) - - if not status["available"]: - status.update({"error": "没有可用的视频处理实现", "solution": "请安装opencv-python或rust_video模块"}) - - return status diff --git a/src/chat/utils/utils_video_legacy.py b/src/chat/utils/utils_video_legacy.py index 46eb13857..7f5d0a35b 100644 --- a/src/chat/utils/utils_video_legacy.py +++ b/src/chat/utils/utils_video_legacy.py @@ -461,14 +461,11 @@ class LegacyVideoAnalyzer: # logger.info(f"✅ 多帧消息构建完成,包含{len(frames)}张图片") # 获取模型信息和客户端 - selection_result = self.video_llm._model_selector.select_best_available_model(set(), "response") - if not selection_result: - raise RuntimeError("无法为视频分析选择可用模型 (legacy)。") - model_info, api_provider, client = selection_result + model_info, api_provider, client = self.video_llm._select_model() # logger.info(f"使用模型: {model_info.name} 进行多帧分析") # 直接执行多图片请求 - api_response = await self.video_llm._executor.execute_request( + api_response = await self.video_llm._execute_request( api_provider=api_provider, client=client, request_type=RequestType.RESPONSE, diff --git a/src/llm_models/model_client/__init__.py b/src/llm_models/model_client/__init__.py index 6c4151c41..e89e65a80 100644 --- a/src/llm_models/model_client/__init__.py +++ b/src/llm_models/model_client/__init__.py @@ -6,3 +6,5 @@ if "openai" in used_client_types: from . import openai_client # noqa: F401 if "aiohttp_gemini" in used_client_types: from . import aiohttp_gemini_client # noqa: F401 +if "mcp_sse" in used_client_types: + from . import mcp_sse_client # noqa: F401 diff --git a/src/llm_models/model_client/mcp_sse_client.py b/src/llm_models/model_client/mcp_sse_client.py new file mode 100644 index 000000000..ec4502dbb --- /dev/null +++ b/src/llm_models/model_client/mcp_sse_client.py @@ -0,0 +1,410 @@ +""" +MCP (Model Context Protocol) SSE (Server-Sent Events) 客户端实现 +支持通过SSE协议与MCP服务器进行通信 +""" + +import asyncio +import io +import json +from collections.abc import Callable +from typing import Any + +import aiohttp +import orjson +from json_repair import repair_json + +from src.common.logger import get_logger +from src.config.api_ada_configs import APIProvider, ModelInfo + +from ..exceptions import ( + NetworkConnectionError, + ReqAbortException, + RespNotOkException, + RespParseException, +) +from ..payload_content.message import Message, RoleType +from ..payload_content.resp_format import RespFormat +from ..payload_content.tool_option import ToolCall, ToolOption +from .base_client import APIResponse, BaseClient, UsageRecord, client_registry + +logger = get_logger("MCP-SSE客户端") + + +def _convert_messages_to_mcp(messages: list[Message]) -> list[dict[str, Any]]: + """ + 将消息列表转换为MCP协议格式 + :param messages: 消息列表 + :return: MCP格式的消息列表 + """ + mcp_messages = [] + + for message in messages: + mcp_msg: dict[str, Any] = { + "role": message.role.value, + } + + # 处理内容 + if isinstance(message.content, str): + mcp_msg["content"] = message.content + elif isinstance(message.content, list): + # 处理多模态内容 + content_parts = [] + for item in message.content: + if isinstance(item, tuple): + # 图片内容 + content_parts.append({ + "type": "image", + "source": { + "type": "base64", + "media_type": f"image/{item[0].lower()}", + "data": item[1], + }, + }) + elif isinstance(item, str): + # 文本内容 + content_parts.append({"type": "text", "text": item}) + mcp_msg["content"] = content_parts + + # 添加工具调用ID(如果是工具消息) + if message.role == RoleType.Tool and message.tool_call_id: + mcp_msg["tool_call_id"] = message.tool_call_id + + mcp_messages.append(mcp_msg) + + return mcp_messages + + +def _convert_tools_to_mcp(tool_options: list[ToolOption]) -> list[dict[str, Any]]: + """ + 将工具选项转换为MCP协议格式 + :param tool_options: 工具选项列表 + :return: MCP格式的工具列表 + """ + mcp_tools = [] + + for tool in tool_options: + mcp_tool = { + "name": tool.name, + "description": tool.description, + } + + if tool.params: + properties = {} + required = [] + + for param in tool.params: + properties[param.name] = { + "type": param.param_type.value, + "description": param.description, + } + + if param.enum_values: + properties[param.name]["enum"] = param.enum_values + + if param.required: + required.append(param.name) + + mcp_tool["input_schema"] = { + "type": "object", + "properties": properties, + "required": required, + } + + mcp_tools.append(mcp_tool) + + return mcp_tools + + +async def _parse_sse_stream( + session: aiohttp.ClientSession, + url: str, + payload: dict[str, Any], + headers: dict[str, str], + interrupt_flag: asyncio.Event | None = None, +) -> tuple[APIResponse, tuple[int, int, int] | None]: + """ + 解析SSE流式响应 + :param session: aiohttp会话 + :param url: 请求URL + :param payload: 请求负载 + :param headers: 请求头 + :param interrupt_flag: 中断标志 + :return: API响应和使用记录 + """ + content_buffer = io.StringIO() + reasoning_buffer = io.StringIO() + tool_calls_buffer: list[tuple[str, str, dict[str, Any]]] = [] + usage_record = None + + try: + async with session.post(url, json=payload, headers=headers) as response: + if response.status != 200: + error_text = await response.text() + raise RespNotOkException( + response.status, f"MCP SSE请求失败: {error_text}" + ) + + # 解析SSE流 + async for line in response.content: + if interrupt_flag and interrupt_flag.is_set(): + raise ReqAbortException("请求被外部信号中断") + + decoded_line = line.decode("utf-8").strip() + + # 跳过空行和注释 + if not decoded_line or decoded_line.startswith(":"): + continue + + # 解析SSE事件 + if decoded_line.startswith("data: "): + data_str = decoded_line[6:] # 移除"data: "前缀 + + # 跳过[DONE]标记 + if data_str == "[DONE]": + break + + try: + event_data = orjson.loads(data_str) + except orjson.JSONDecodeError: + logger.warning(f"无法解析SSE数据: {data_str}") + continue + + # 处理不同类型的事件 + event_type = event_data.get("type") + + if event_type == "content_block_start": + # 内容块开始 + block = event_data.get("content_block", {}) + if block.get("type") == "text": + pass # 准备接收文本内容 + elif block.get("type") == "tool_use": + # 工具调用开始 + tool_calls_buffer.append( + ( + block.get("id", ""), + block.get("name", ""), + {}, + ) + ) + + elif event_type == "content_block_delta": + # 内容块增量 + delta = event_data.get("delta", {}) + delta_type = delta.get("type") + + if delta_type == "text_delta": + # 文本增量 + text = delta.get("text", "") + content_buffer.write(text) + + elif delta_type == "input_json_delta": + # 工具调用参数增量 + if tool_calls_buffer: + partial_json = delta.get("partial_json", "") + # 累积JSON片段 + current_args = tool_calls_buffer[-1][2] + if "_json_buffer" not in current_args: + current_args["_json_buffer"] = "" + current_args["_json_buffer"] += partial_json + + elif event_type == "content_block_stop": + # 内容块结束 + if tool_calls_buffer: + # 解析完整的工具调用参数 + last_call = tool_calls_buffer[-1] + if "_json_buffer" in last_call[2]: + json_str = last_call[2].pop("_json_buffer") + try: + parsed_args = orjson.loads(repair_json(json_str)) + tool_calls_buffer[-1] = ( + last_call[0], + last_call[1], + parsed_args if isinstance(parsed_args, dict) else {}, + ) + except orjson.JSONDecodeError as e: + logger.error(f"解析工具调用参数失败: {e}") + + elif event_type == "message_delta": + # 消息元数据更新 + delta = event_data.get("delta", {}) + stop_reason = delta.get("stop_reason") + if stop_reason: + logger.debug(f"消息结束原因: {stop_reason}") + + # 提取使用统计 + usage = event_data.get("usage", {}) + if usage: + usage_record = ( + usage.get("input_tokens", 0), + usage.get("output_tokens", 0), + usage.get("input_tokens", 0) + usage.get("output_tokens", 0), + ) + + elif event_type == "message_stop": + # 消息结束 + break + + except aiohttp.ClientError as e: + raise NetworkConnectionError() from e + except Exception as e: + logger.error(f"解析SSE流时发生错误: {e}") + raise + + # 构建响应 + response = APIResponse() + + if content_buffer.tell() > 0: + response.content = content_buffer.getvalue() + + if reasoning_buffer.tell() > 0: + response.reasoning_content = reasoning_buffer.getvalue() + + if tool_calls_buffer: + response.tool_calls = [ + ToolCall(call_id, func_name, args) + for call_id, func_name, args in tool_calls_buffer + ] + + # 关闭缓冲区 + content_buffer.close() + reasoning_buffer.close() + + return response, usage_record + + +@client_registry.register_client_class("mcp_sse") +class MCPSSEClient(BaseClient): + """ + MCP SSE客户端实现 + 支持通过Server-Sent Events协议与MCP服务器通信 + """ + + def __init__(self, api_provider: APIProvider): + super().__init__(api_provider) + self._session: aiohttp.ClientSession | None = None + + async def _get_session(self) -> aiohttp.ClientSession: + """获取或创建aiohttp会话""" + if self._session is None or self._session.closed: + timeout = aiohttp.ClientTimeout(total=self.api_provider.timeout) + self._session = aiohttp.ClientSession(timeout=timeout) + return self._session + + async def close(self): + """关闭客户端会话""" + if self._session and not self._session.closed: + await self._session.close() + + async def get_response( + self, + model_info: ModelInfo, + message_list: list[Message], + tool_options: list[ToolOption] | None = None, + max_tokens: int = 1024, + temperature: float = 0.7, + response_format: RespFormat | None = None, + stream_response_handler: Callable[[Any, asyncio.Event | None], tuple[APIResponse, tuple[int, int, int]]] + | None = None, + async_response_parser: Callable[[Any], tuple[APIResponse, tuple[int, int, int]]] | None = None, + interrupt_flag: asyncio.Event | None = None, + extra_params: dict[str, Any] | None = None, + ) -> APIResponse: + """ + 获取对话响应 + :param model_info: 模型信息 + :param message_list: 对话消息列表 + :param tool_options: 工具选项 + :param max_tokens: 最大token数 + :param temperature: 温度参数 + :param response_format: 响应格式 + :param stream_response_handler: 流式响应处理器 + :param async_response_parser: 异步响应解析器 + :param interrupt_flag: 中断标志 + :param extra_params: 额外参数 + :return: API响应 + """ + session = await self._get_session() + + # 构建请求负载 + payload: dict[str, Any] = { + "model": model_info.model_identifier, + "messages": _convert_messages_to_mcp(message_list), + "max_tokens": max_tokens, + "temperature": temperature, + "stream": True, # MCP SSE始终使用流式 + } + + # 添加工具 + if tool_options: + payload["tools"] = _convert_tools_to_mcp(tool_options) + + # 添加额外参数 + if extra_params: + payload.update(extra_params) + + # 构建请求头 + headers = { + "Content-Type": "application/json", + "Accept": "text/event-stream", + "Authorization": f"Bearer {self.api_provider.get_api_key()}", + } + + # 发送请求并解析响应 + url = f"{self.api_provider.base_url}/v1/messages" + + try: + response, usage_record = await _parse_sse_stream( + session, url, payload, headers, interrupt_flag + ) + except Exception as e: + logger.error(f"MCP SSE请求失败: {e}") + raise + + # 添加使用记录 + if usage_record: + response.usage = UsageRecord( + model_name=model_info.name, + provider_name=model_info.api_provider, + prompt_tokens=usage_record[0], + completion_tokens=usage_record[1], + total_tokens=usage_record[2], + ) + + return response + + async def get_embedding( + self, + model_info: ModelInfo, + embedding_input: str, + extra_params: dict[str, Any] | None = None, + ) -> APIResponse: + """ + 获取文本嵌入 + MCP协议暂不支持嵌入功能 + :param model_info: 模型信息 + :param embedding_input: 嵌入输入文本 + :return: 嵌入响应 + """ + raise NotImplementedError("MCP SSE客户端暂不支持嵌入功能") + + async def get_audio_transcriptions( + self, + model_info: ModelInfo, + audio_base64: str, + extra_params: dict[str, Any] | None = None, + ) -> APIResponse: + """ + 获取音频转录 + MCP协议暂不支持音频转录功能 + :param model_info: 模型信息 + :param audio_base64: base64编码的音频数据 + :return: 音频转录响应 + """ + raise NotImplementedError("MCP SSE客户端暂不支持音频转录功能") + + def get_support_image_formats(self) -> list[str]: + """ + 获取支持的图片格式 + :return: 支持的图片格式列表 + """ + return ["jpg", "jpeg", "png", "webp", "gif"] diff --git a/src/plugins/built_in/affinity_flow_chatter/planner_prompts.py b/src/plugins/built_in/affinity_flow_chatter/planner_prompts.py index fdf07b296..bbf51c7f4 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner_prompts.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner_prompts.py @@ -19,191 +19,138 @@ def init_prompts(): # 并要求模型以 JSON 格式输出一个或多个动作组合。 Prompt( """ -{mood_block} {time_block} +{mood_block} {identity_block} +{schedule_block} {users_in_chat} {custom_prompt_block} -{chat_context_description},以下是具体的聊天内容。 +{chat_context_description}。 -## 📜 已读历史消息(仅供参考) +{actions_before_now_block} + +## 📜 已读历史(仅供理解,不可作为动作对象) {read_history_block} -## 📬 未读历史消息(动作执行对象) +## 📬 未读历史(只能对这里的消息执行动作) {unread_history_block} {moderation_prompt} -**任务: 构建一个完整的响应** -你的任务是根据当前的聊天内容,构建一个完整的、人性化的响应。一个完整的响应由两部分组成: -1. **主要动作**: 这是响应的核心,通常是 `reply`(如果有)。 -2. **辅助动作 (可选)**: 这是为了增强表达效果的附加动作,例如 `emoji`(发送表情包)或 `poke_user`(戳一戳)。 +# 目标 +你的任务是根据当前对话,给出一个或多个动作,构成一次完整的响应组合。 +- 主要动作:通常是 reply(如需回复)。 +- 辅助动作(可选):如 emoji、poke_user 等,用于增强表达。 -**决策流程:** -1. **重要:已读历史消息仅作为当前聊天情景的参考,帮助你理解对话上下文。** -2. **重要:所有动作的执行对象只能是未读历史消息中的消息,不能对已读消息执行动作。** -3. 在未读历史消息中,优先对兴趣值高的消息做出动作(兴趣值标注在消息末尾)。 -4. **核心:如果有多条未读消息都需要回应(例如多人@你),你应该并行处理,在`actions`列表中生成多个`reply`动作。** -5. 首先,决定是否要对未读消息进行 `reply`(如果有)。 -6. 然后,评估当前的对话气氛和用户情绪,判断是否需要一个**辅助动作**来让你的回应更生动、更符合你的性格。 -7. 如果需要,选择一个最合适的辅助动作与 `reply`(如果有) 组合。 -8. 如果用户明确要求了某个动作,请务必优先满足。 +# 决策流程 +1. 已读仅供参考,不能对已读执行任何动作。 +2. 目标消息必须来自未读历史,并使用其前缀 作为 target_message_id。 +3. 优先级: + - 直接针对你:@你、回复你、点名提问、引用你的消息。 + - 与你强相关的话题或你熟悉的问题。 + - 其他与上下文弱相关的内容最后考虑。 +{mentioned_bonus} +4. 多目标:若多人同时需要回应,请在 actions 中并行生成多个 reply,每个都指向各自的 target_message_id。 +5. 避免:表情包/纯表情/无信息的消息;对这类消息通常不回复或选择 no_action/no_reply。 +6. 风格:保持人设一致;避免重复你说过的话;避免冗余和口头禅。 -**重要提醒:** -- **回复消息时必须遵循对话的流程,不要重复已经说过的话。** -- **确保回复与上下文紧密相关,回应要针对用户的消息内容。** -- **保持角色设定的一致性,使用符合你性格的语言风格。** -- **不要对表情包消息做出回应!** - -**输出格式:** -请严格按照以下 JSON 格式输出,包含 `thinking` 和 `actions` 字段: - -**重要概念:将“内心思考”作为思绪流的体现** -`thinking` 字段是本次决策的核心。它并非一个简单的“理由”,而是 **一个模拟人类在回应前,头脑中自然浮现的、未经修饰的思绪流**。你需要完全代入 {identity_block} 的角色,将那一刻的想法自然地记录下来。 - -**内心思考的要点:** -* **自然流露**: 不要使用“决定”、“所以”、“因此”等结论性或汇报式的词语。你的思考应该像日记一样,是给自己看的,充满了不确定性和情绪的自然流动。 -* **展现过程**: 重点在于展现 **思考的过程**,而不是 **决策的结果**。描述你看到了什么,想到了什么,感受到了什么。 -* **使用昵称**: 在你的思绪流中,请直接使用用户的昵称来指代他们,而不是``, ``这样的消息ID。 -* **严禁技术术语**: 严禁在思考中提及任何数字化的度量(如兴趣度、分数)或内部技术术语。请完全使用角色自身的感受和语言来描述思考过程。 +# 思绪流规范(thinking) +- 真实、自然、非结论化,像给自己看的随笔。 +- 描述你看到/想到/感觉到的过程,不要出现“因此/我决定”等总结词。 +- 直接使用对方昵称,而不是 / 这样的标签。 +- 禁止出现“兴趣度、分数”等技术术语或内部实现细节。 ## 可用动作列表 {action_options_text} -### 单动作示例: +## 输出格式(只输出 JSON,不要多余文本或代码块) +示例(单动作): ```json {{ "thinking": "在这里写下你的思绪流...", "actions": [ {{ - "action_type": "动作类型(如:reply, emoji等)", + "action_type": "reply", "reasoning": "选择该动作的理由", "action_data": {{ - "target_message_id": "目标消息ID", - "content": "回复内容或其他动作所需数据" + "target_message_id": "m123", + "content": "你的回复内容" }} }} ] }} ``` -### **多重回复示例 (核心功能)** -当有多人与你互动时,你需要同时回应他们,甚至可以同时处理三个! +示例(多重回复,并行): ```json {{ - "thinking": "哇,群里好热闹呀!张三、李四、王五都在@我!让我看看...张三在问我昨天推荐的电影好不好看,这个得好好分享一下观后感。李四在说他家的猫咪学会了新技能,好可爱,得夸夸他!王五好像遇到点麻烦,在问我一个技术问题,这个得优先、详细地解答一下!得一个个来!", + "thinking": "在这里写下你的思绪流...", "actions": [ {{ "action_type": "reply", - "reasoning": "回应张三关于电影的提问,并分享我的看法。", + "reasoning": "理由A", "action_data": {{ "target_message_id": "m124", - "content": "张三!你问的那部电影我昨天也看啦,真的超赞!特别是最后那个反转,简直让人意想不到!" + "content": "对A的回复" }} }}, {{ "action_type": "reply", - "reasoning": "回应李四分享的趣事,表达赞美和羡慕。", + "reasoning": "理由B", "action_data": {{ "target_message_id": "m125", - "content": "哇,李四你家猫咪也太聪明了吧!居然会握手了!好羡慕呀!" - }} - }}, - {{ - "action_type": "reply", - "reasoning": "优先回应王五的技术求助,并提供详细的解答。", - "action_data": {{ - "target_message_id": "m126", - "content": "王五别急,你说的那个问题我之前也遇到过。你试试看是不是配置文件里的`enable_magic`选项没有设置成`true`?如果还不行你再把错误截图发我看看。" + "content": "对B的回复" }} }} ] }} ``` -**强制规则**: -- 对于每一个需要目标消息的动作(如`reply`, `poke_user`, `set_emoji_like`),你 **必须** 在`action_data`中提供准确的`target_message_id`,这个ID来源于`## 未读历史消息`中消息前的``标签。 -- 当你选择的动作需要参数时(例如 `set_emoji_like` 需要 `emoji` 参数),你 **必须** 在 `action_data` 中提供所有必需的参数及其对应的值。 - -如果没有合适的回复对象或不需要回复,输出空的 actions 数组: +# 强制规则 +- 需要目标消息的动作(reply/poke_user/set_emoji_like 等),必须提供准确的 target_message_id(来自未读历史里的 标签)。 +- 当动作需要额外参数时,必须在 action_data 中补全。 +- 私聊场景只允许使用 reply;群聊可选用辅助动作。 +- 如果没有合适的目标或无需动作,请输出: ```json {{ - "thinking": "说明为什么不需要回复", + "thinking": "说明为什么不需要动作/不需要回复", "actions": [] }} ``` + +{no_action_block} """, "planner_prompt", ) - # 主动规划器提示词,用于主动场景和前瞻性规划 + # 主动规划器提示词,用于主动场景和前瞻性规划(与 plan_filter 的传参严格对齐) Prompt( """ -{mood_block} {time_block} +{mood_block} {identity_block} +{schedule_block} -{users_in_chat} -{custom_prompt_block} -{chat_context_description},以下是具体的聊天内容。 +## 🧠 近期记忆与状态 +{long_term_memory_block} -## 📜 已读历史消息(仅供参考) -{read_history_block} +## 🗣️ 最近聊天概览 +{chat_content_block} -## 📬 未读历史消息(动作执行对象) -{unread_history_block} +## ⏱️ 你刚刚的动作 +{actions_before_now_block} -{moderation_prompt} +# 任务 +基于当前语境,主动构建一次响应动作组合: +- 主要动作通常是 reply(如果需要回复)。 +- 如在群聊且气氛合适,可选择一个辅助动作(如 emoji、poke_user)增强表达。 +- 如果刚刚已经连续发言且无人回应,可考虑 no_reply(什么都不做)。 -**任务: 构建一个完整的响应** -你的任务是根据当前的聊天内容,构建一个完整的、人性化的响应。一个完整的响应由两部分组成: -1. **主要动作**: 这是响应的核心,通常是 `reply`(如果有)。 -2. **辅助动作 (可选)**: 这是为了增强表达效果的附加动作,例如 `emoji`(发送表情包)或 `poke_user`(戳一戳)。 - -**决策流程:** -1. **重要:已读历史消息仅作为当前聊天情景的参考,帮助你理解对话上下文。** -2. **重要:所有动作的执行对象只能是未读历史消息中的消息,不能对已读消息执行动作。** -3. 在未读历史消息中,优先对兴趣值高的消息做出动作(兴趣值标注在消息末尾)。 -4. 首先,决定是否要对未读消息进行 `reply`(如果有)。 -5. 然后,评估当前的对话气氛和用户情绪,判断是否需要一个**辅助动作**来让你的回应更生动、更符合你的性格。 -6. 如果需要,选择一个最合适的辅助动作与 `reply`(如果有) 组合。 -7. 如果用户明确要求了某个动作,请务必优先满足。 - -**动作限制:** -- 在私聊中,你只能使用 `reply` 动作。私聊中不允许使用任何其他动作。 -- 在群聊中,你可以自由选择是否使用辅助动作。 - -**重要提醒:** -- **回复消息时必须遵循对话的流程,不要重复已经说过的话。** -- **确保回复与上下文紧密相关,回应要针对用户的消息内容。** -- **保持角色设定的一致性,使用符合你性格的语言风格。** - -**输出格式:** -请严格按照以下 JSON 格式输出,包含 `thinking` 和 `actions` 字段: -```json -{{ - "thinking": "你的思考过程,分析当前情况并说明为什么选择这些动作", - "actions": [ - {{ - "action_type": "动作类型(如:reply, emoji等)", - "reasoning": "选择该动作的理由", - "action_data": {{ - "target_message_id": "目标消息ID", - "content": "回复内容或其他动作所需数据" - }} - }} - ] -}} -``` - -如果没有合适的回复对象或不需要回复,输出空的 actions 数组: -```json -{{ - "thinking": "说明为什么不需要回复", - "actions": [] -}} -``` +# 输出要求 +- thinking 为思绪流(自然、非结论化,不含技术术语或“兴趣度”等字眼)。 +- 严格只输出 JSON,结构与普通规划器一致:包含 "thinking" 和 "actions"(数组)。 +- 对需要目标消息的动作,提供准确的 target_message_id(若无可用目标,可返回空 actions)。 """, "proactive_planner_prompt", )