diff --git a/hermes_cli/tools_config.py b/hermes_cli/tools_config.py index b3df18d932..21439a28c1 100644 --- a/hermes_cli/tools_config.py +++ b/hermes_cli/tools_config.py @@ -56,6 +56,7 @@ CONFIGURABLE_TOOLSETS = [ ("file", "📁 File Operations", "read, write, patch, search"), ("code_execution", "⚡ Code Execution", "execute_code"), ("vision", "👁️ Vision / Image Analysis", "vision_analyze"), + ("video", "🎬 Video Analysis", "video_analyze (requires video-capable model)"), ("image_gen", "🎨 Image Generation", "image_generate"), ("moa", "🧠 Mixture of Agents", "mixture_of_agents"), ("tts", "🔊 Text-to-Speech", "text_to_speech"), @@ -78,7 +79,7 @@ CONFIGURABLE_TOOLSETS = [ # Toolsets that are OFF by default for new installs. # They're still in _HERMES_CORE_TOOLS (available at runtime if enabled), # but the setup checklist won't pre-select them for first-time users. -_DEFAULT_OFF_TOOLSETS = {"moa", "homeassistant", "rl", "spotify", "discord", "discord_admin"} +_DEFAULT_OFF_TOOLSETS = {"moa", "homeassistant", "rl", "spotify", "discord", "discord_admin", "video"} # Platform-scoped toolsets: only appear in the `hermes tools` checklist for # these platforms, and only resolve/save for these platforms. A toolset diff --git a/tests/tools/test_video_analyze.py b/tests/tools/test_video_analyze.py new file mode 100644 index 0000000000..62987d96b2 --- /dev/null +++ b/tests/tools/test_video_analyze.py @@ -0,0 +1,337 @@ +"""Tests for video_analyze tool in tools/vision_tools.py.""" + +import asyncio +import json +import os +from pathlib import Path +from typing import Awaitable +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from tools.vision_tools import ( + _detect_video_mime_type, + _video_to_base64_data_url, + _handle_video_analyze, + _MAX_VIDEO_BASE64_BYTES, + _VIDEO_MIME_TYPES, + _VIDEO_SIZE_WARN_BYTES, + video_analyze_tool, + VIDEO_ANALYZE_SCHEMA, +) + + +# --------------------------------------------------------------------------- +# _detect_video_mime_type +# --------------------------------------------------------------------------- + + +class TestDetectVideoMimeType: + """Extension-based MIME detection for video files.""" + + def test_mp4(self, tmp_path): + p = tmp_path / "clip.mp4" + p.write_bytes(b"\x00" * 10) + assert _detect_video_mime_type(p) == "video/mp4" + + def test_webm(self, tmp_path): + p = tmp_path / "clip.webm" + p.write_bytes(b"\x00" * 10) + assert _detect_video_mime_type(p) == "video/webm" + + def test_mov(self, tmp_path): + p = tmp_path / "clip.mov" + p.write_bytes(b"\x00" * 10) + assert _detect_video_mime_type(p) == "video/mov" + + def test_avi_fallback_mp4(self, tmp_path): + p = tmp_path / "clip.avi" + p.write_bytes(b"\x00" * 10) + assert _detect_video_mime_type(p) == "video/mp4" + + def test_mkv_fallback_mp4(self, tmp_path): + p = tmp_path / "clip.mkv" + p.write_bytes(b"\x00" * 10) + assert _detect_video_mime_type(p) == "video/mp4" + + def test_mpeg(self, tmp_path): + p = tmp_path / "clip.mpeg" + p.write_bytes(b"\x00" * 10) + assert _detect_video_mime_type(p) == "video/mpeg" + + def test_mpg(self, tmp_path): + p = tmp_path / "clip.mpg" + p.write_bytes(b"\x00" * 10) + assert _detect_video_mime_type(p) == "video/mpeg" + + def test_unsupported_extension(self, tmp_path): + p = tmp_path / "clip.flv" + p.write_bytes(b"\x00" * 10) + assert _detect_video_mime_type(p) is None + + def test_case_insensitive(self, tmp_path): + p = tmp_path / "clip.MP4" + p.write_bytes(b"\x00" * 10) + assert _detect_video_mime_type(p) == "video/mp4" + + +# --------------------------------------------------------------------------- +# _video_to_base64_data_url +# --------------------------------------------------------------------------- + + +class TestVideoToBase64DataUrl: + """Base64 encoding of video files.""" + + def test_produces_data_url(self, tmp_path): + p = tmp_path / "test.mp4" + p.write_bytes(b"\x00\x01\x02\x03") + result = _video_to_base64_data_url(p) + assert result.startswith("data:video/mp4;base64,") + + def test_custom_mime_type(self, tmp_path): + p = tmp_path / "test.webm" + p.write_bytes(b"\x00\x01\x02\x03") + result = _video_to_base64_data_url(p, mime_type="video/webm") + assert result.startswith("data:video/webm;base64,") + + def test_default_mime_for_unknown_ext(self, tmp_path): + p = tmp_path / "test.xyz" + p.write_bytes(b"\x00\x01\x02\x03") + result = _video_to_base64_data_url(p) + # Falls back to video/mp4 + assert result.startswith("data:video/mp4;base64,") + + +# --------------------------------------------------------------------------- +# Schema validation +# --------------------------------------------------------------------------- + + +class TestVideoAnalyzeSchema: + """Schema structure is correct.""" + + def test_schema_name(self): + assert VIDEO_ANALYZE_SCHEMA["name"] == "video_analyze" + + def test_schema_has_required_fields(self): + params = VIDEO_ANALYZE_SCHEMA["parameters"] + assert "video_url" in params["properties"] + assert "question" in params["properties"] + assert params["required"] == ["video_url", "question"] + + def test_schema_description_mentions_video(self): + assert "video" in VIDEO_ANALYZE_SCHEMA["description"].lower() + + +# --------------------------------------------------------------------------- +# _handle_video_analyze handler +# --------------------------------------------------------------------------- + + +class TestHandleVideoAnalyze: + """Tests for the registry handler wrapper.""" + + def test_returns_awaitable(self, tmp_path, monkeypatch): + video_file = tmp_path / "test.mp4" + video_file.write_bytes(b"\x00" * 100) + monkeypatch.setenv("AUXILIARY_VIDEO_MODEL", "") + monkeypatch.setenv("AUXILIARY_VISION_MODEL", "") + + with patch("tools.vision_tools.video_analyze_tool", new_callable=AsyncMock) as mock_tool: + mock_tool.return_value = json.dumps({"success": True, "analysis": "test"}) + result = _handle_video_analyze({"video_url": str(video_file), "question": "what is this?"}) + # Should return an awaitable (coroutine) + assert asyncio.iscoroutine(result) + # Clean up the unawaited coroutine + result.close() + + def test_uses_auxiliary_video_model_env(self, tmp_path, monkeypatch): + monkeypatch.setenv("AUXILIARY_VIDEO_MODEL", "google/gemini-2.5-flash") + monkeypatch.setenv("AUXILIARY_VISION_MODEL", "other-model") + + with patch("tools.vision_tools.video_analyze_tool", new_callable=AsyncMock) as mock_tool: + mock_tool.return_value = json.dumps({"success": True, "analysis": "ok"}) + asyncio.get_event_loop().run_until_complete( + _handle_video_analyze({"video_url": "/tmp/test.mp4", "question": "test"}) + ) + args = mock_tool.call_args[0] + assert args[2] == "google/gemini-2.5-flash" + + def test_falls_back_to_vision_model_env(self, tmp_path, monkeypatch): + monkeypatch.setenv("AUXILIARY_VIDEO_MODEL", "") + monkeypatch.setenv("AUXILIARY_VISION_MODEL", "google/gemini-flash") + + with patch("tools.vision_tools.video_analyze_tool", new_callable=AsyncMock) as mock_tool: + mock_tool.return_value = json.dumps({"success": True, "analysis": "ok"}) + asyncio.get_event_loop().run_until_complete( + _handle_video_analyze({"video_url": "/tmp/test.mp4", "question": "test"}) + ) + args = mock_tool.call_args[0] + assert args[2] == "google/gemini-flash" + + +# --------------------------------------------------------------------------- +# video_analyze_tool — integration-style tests with mocked LLM +# --------------------------------------------------------------------------- + + +class TestVideoAnalyzeTool: + """Core video analysis function tests.""" + + def _run(self, coro): + return asyncio.get_event_loop().run_until_complete(coro) + + def test_local_file_success(self, tmp_path, monkeypatch): + """Analyze a local video file — happy path.""" + video = tmp_path / "demo.mp4" + video.write_bytes(b"\x00" * 1024) + + mock_response = MagicMock() + mock_response.choices = [MagicMock()] + mock_response.choices[0].message.content = "A short video showing a demo." + + with patch("tools.vision_tools.async_call_llm", new_callable=AsyncMock, return_value=mock_response): + with patch("tools.vision_tools.extract_content_or_reasoning", return_value="A short video showing a demo."): + result = self._run(video_analyze_tool(str(video), "What is this?")) + + data = json.loads(result) + assert data["success"] is True + assert "demo" in data["analysis"].lower() + + def test_local_file_not_found(self, tmp_path): + """Non-existent file raises appropriate error.""" + result = self._run(video_analyze_tool("/nonexistent/video.mp4", "What?")) + data = json.loads(result) + assert data["success"] is False + assert "invalid video source" in data["analysis"].lower() + + def test_unsupported_format(self, tmp_path): + """Unsupported extension raises error.""" + video = tmp_path / "clip.flv" + video.write_bytes(b"\x00" * 100) + + result = self._run(video_analyze_tool(str(video), "What is this?")) + data = json.loads(result) + assert data["success"] is False + assert "unsupported video format" in data["analysis"].lower() + + def test_video_too_large(self, tmp_path, monkeypatch): + """Video exceeding max size is rejected.""" + video = tmp_path / "huge.mp4" + # Don't actually write 50MB — mock the stat + video.write_bytes(b"\x00" * 100) + + # Patch the base64 encoding to return something huge + with patch("tools.vision_tools._video_to_base64_data_url") as mock_encode: + mock_encode.return_value = "data:video/mp4;base64," + "A" * (_MAX_VIDEO_BASE64_BYTES + 1) + result = self._run(video_analyze_tool(str(video), "What?")) + + data = json.loads(result) + assert data["success"] is False + assert "too large" in data["analysis"].lower() + + def test_interrupt_check(self, tmp_path): + """Tool respects interrupt flag.""" + video = tmp_path / "test.mp4" + video.write_bytes(b"\x00" * 100) + + with patch("tools.interrupt.is_interrupted", return_value=True): + result = self._run(video_analyze_tool(str(video), "What?")) + + data = json.loads(result) + assert data["success"] is False + + def test_empty_response_retries(self, tmp_path): + """Retries once on empty model response.""" + video = tmp_path / "test.mp4" + video.write_bytes(b"\x00" * 100) + + call_count = 0 + mock_response = MagicMock() + mock_response.choices = [MagicMock()] + mock_response.choices[0].message.content = "Video analysis result." + + async def fake_llm(**kwargs): + nonlocal call_count + call_count += 1 + return mock_response + + with patch("tools.vision_tools.async_call_llm", side_effect=fake_llm): + with patch("tools.vision_tools.extract_content_or_reasoning", side_effect=["", "Video analysis result."]): + result = self._run(video_analyze_tool(str(video), "What?")) + + data = json.loads(result) + assert data["success"] is True + assert call_count == 2 # Initial call + retry + + def test_file_scheme_stripped(self, tmp_path): + """file:// prefix is stripped correctly.""" + video = tmp_path / "test.mp4" + video.write_bytes(b"\x00" * 100) + + mock_response = MagicMock() + mock_response.choices = [MagicMock()] + mock_response.choices[0].message.content = "OK" + + with patch("tools.vision_tools.async_call_llm", new_callable=AsyncMock, return_value=mock_response): + with patch("tools.vision_tools.extract_content_or_reasoning", return_value="OK"): + result = self._run(video_analyze_tool(f"file://{video}", "What?")) + + data = json.loads(result) + assert data["success"] is True + + def test_api_message_format(self, tmp_path): + """Verify the message sent to LLM uses video_url content type.""" + video = tmp_path / "test.mp4" + video.write_bytes(b"\x00" * 100) + + captured_kwargs = {} + + async def capture_llm(**kwargs): + captured_kwargs.update(kwargs) + mock_response = MagicMock() + mock_response.choices = [MagicMock()] + mock_response.choices[0].message.content = "OK" + return mock_response + + with patch("tools.vision_tools.async_call_llm", side_effect=capture_llm): + with patch("tools.vision_tools.extract_content_or_reasoning", return_value="OK"): + self._run(video_analyze_tool(str(video), "Describe this")) + + messages = captured_kwargs["messages"] + assert len(messages) == 1 + content = messages[0]["content"] + assert len(content) == 2 + assert content[0]["type"] == "text" + assert content[1]["type"] == "video_url" + assert "video_url" in content[1] + assert content[1]["video_url"]["url"].startswith("data:video/mp4;base64,") + + +# --------------------------------------------------------------------------- +# Toolset registration +# --------------------------------------------------------------------------- + + +class TestVideoToolsetRegistration: + """Verify the tool is registered correctly.""" + + def test_registered_in_video_toolset(self): + from tools.registry import registry + entry = registry.get_entry("video_analyze") + assert entry is not None + assert entry.toolset == "video" + assert entry.is_async is True + assert entry.emoji == "🎬" + + def test_not_in_core_tools(self): + """video_analyze should NOT be in _HERMES_CORE_TOOLS (default disabled).""" + from toolsets import _HERMES_CORE_TOOLS + assert "video_analyze" not in _HERMES_CORE_TOOLS + + def test_in_video_toolset_definition(self): + """Toolset 'video' should contain video_analyze.""" + from toolsets import TOOLSETS + assert "video" in TOOLSETS + assert "video_analyze" in TOOLSETS["video"]["tools"] diff --git a/tools/vision_tools.py b/tools/vision_tools.py index 233b737272..e7389e3efa 100644 --- a/tools/vision_tools.py +++ b/tools/vision_tools.py @@ -801,3 +801,364 @@ registry.register( is_async=True, emoji="👁️", ) + + +# --------------------------------------------------------------------------- +# Video Analysis Tool +# --------------------------------------------------------------------------- + +# Extension → MIME. avi/mkv fall back to mp4. +_VIDEO_MIME_TYPES = { + ".mp4": "video/mp4", + ".webm": "video/webm", + ".mov": "video/mov", + ".avi": "video/mp4", + ".mkv": "video/mp4", + ".mpeg": "video/mpeg", + ".mpg": "video/mpeg", +} + +_MAX_VIDEO_BASE64_BYTES = 50 * 1024 * 1024 # 50 MB hard cap +_VIDEO_SIZE_WARN_BYTES = 20 * 1024 * 1024 + + +def _detect_video_mime_type(video_path: Path) -> Optional[str]: + """Return a video MIME type based on file extension, or None if unsupported.""" + ext = video_path.suffix.lower() + return _VIDEO_MIME_TYPES.get(ext) + + +def _video_to_base64_data_url(video_path: Path, mime_type: Optional[str] = None) -> str: + """Convert a video file to a base64-encoded data URL.""" + data = video_path.read_bytes() + encoded = base64.b64encode(data).decode("ascii") + mime = mime_type or _VIDEO_MIME_TYPES.get(video_path.suffix.lower(), "video/mp4") + return f"data:{mime};base64,{encoded}" + + +async def _download_video(video_url: str, destination: Path, max_retries: int = 3) -> Path: + """Download video from URL with SSRF protection and retry.""" + import asyncio + + destination.parent.mkdir(parents=True, exist_ok=True) + + async def _ssrf_redirect_guard(response): + if response.is_redirect and response.next_request: + redirect_url = str(response.next_request.url) + from tools.url_safety import is_safe_url + if not is_safe_url(redirect_url): + raise ValueError( + f"Blocked redirect to private/internal address: {redirect_url}" + ) + + last_error = None + for attempt in range(max_retries): + try: + blocked = check_website_access(video_url) + if blocked: + raise PermissionError(blocked["message"]) + + async with httpx.AsyncClient( + timeout=60.0, + follow_redirects=True, + event_hooks={"response": [_ssrf_redirect_guard]}, + ) as client: + response = await client.get( + video_url, + headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Accept": "video/*,*/*;q=0.8", + }, + ) + response.raise_for_status() + + cl = response.headers.get("content-length") + if cl and int(cl) > _MAX_VIDEO_BASE64_BYTES: + raise ValueError( + f"Video too large ({int(cl)} bytes, max {_MAX_VIDEO_BASE64_BYTES})" + ) + + final_url = str(response.url) + blocked = check_website_access(final_url) + if blocked: + raise PermissionError(blocked["message"]) + + body = response.content + if len(body) > _MAX_VIDEO_BASE64_BYTES: + raise ValueError( + f"Video too large ({len(body)} bytes, max {_MAX_VIDEO_BASE64_BYTES})" + ) + destination.write_bytes(body) + + return destination + except Exception as e: + last_error = e + if attempt < max_retries - 1: + wait_time = 2 ** (attempt + 1) + logger.warning("Video download failed (attempt %s/%s): %s", attempt + 1, max_retries, str(e)[:50]) + await asyncio.sleep(wait_time) + else: + logger.error( + "Video download failed after %s attempts: %s", + max_retries, str(e)[:100], exc_info=True, + ) + + if last_error is None: + raise RuntimeError( + f"_download_video exited retry loop without attempting (max_retries={max_retries})" + ) + raise last_error + + +async def video_analyze_tool( + video_url: str, + user_prompt: str, + model: str = None, +) -> str: + """Analyze a video via multimodal LLM. Returns JSON {success, analysis}.""" + debug_call_data = { + "parameters": { + "video_url": video_url, + "user_prompt": user_prompt[:200] + "..." if len(user_prompt) > 200 else user_prompt, + "model": model, + }, + "error": None, + "success": False, + "analysis_length": 0, + "model_used": model, + "video_size_bytes": 0, + } + + temp_video_path = None + should_cleanup = True + + try: + from tools.interrupt import is_interrupted + if is_interrupted(): + return tool_error("Interrupted", success=False) + + logger.info("Analyzing video: %s", video_url[:60]) + logger.info("User prompt: %s", user_prompt[:100]) + + # Resolve local path vs remote URL + resolved_url = video_url + if resolved_url.startswith("file://"): + resolved_url = resolved_url[len("file://"):] + local_path = Path(os.path.expanduser(resolved_url)) + + if local_path.is_file(): + logger.info("Using local video file: %s", video_url) + temp_video_path = local_path + should_cleanup = False + elif _validate_image_url(video_url): + blocked = check_website_access(video_url) + if blocked: + raise PermissionError(blocked["message"]) + temp_dir = get_hermes_dir("cache/video", "temp_video_files") + temp_video_path = temp_dir / f"temp_video_{uuid.uuid4()}.mp4" + await _download_video(video_url, temp_video_path) + should_cleanup = True + else: + raise ValueError( + "Invalid video source. Provide an HTTP/HTTPS URL or a valid local file path." + ) + + video_size_bytes = temp_video_path.stat().st_size + video_size_mb = video_size_bytes / (1024 * 1024) + logger.info("Video ready (%.1f MB)", video_size_mb) + + detected_mime = _detect_video_mime_type(temp_video_path) + if not detected_mime: + raise ValueError( + f"Unsupported video format: '{temp_video_path.suffix}'. " + f"Supported: {', '.join(sorted(_VIDEO_MIME_TYPES.keys()))}" + ) + + if video_size_bytes > _VIDEO_SIZE_WARN_BYTES: + logger.warning("Video is %.1f MB — may be slow or rejected", video_size_mb) + + video_data_url = _video_to_base64_data_url(temp_video_path, mime_type=detected_mime) + data_size_mb = len(video_data_url) / (1024 * 1024) + + if len(video_data_url) > _MAX_VIDEO_BASE64_BYTES: + raise ValueError( + f"Video too large for API: base64 payload is {data_size_mb:.1f} MB " + f"(limit {_MAX_VIDEO_BASE64_BYTES / (1024 * 1024):.0f} MB). " + f"Compress or trim the video and retry." + ) + + debug_call_data["video_size_bytes"] = video_size_bytes + + messages = [ + { + "role": "user", + "content": [ + { + "type": "text", + "text": user_prompt, + }, + { + "type": "video_url", + "video_url": { + "url": video_data_url, + }, + }, + ], + } + ] + + vision_timeout = 180.0 + vision_temperature = 0.1 + try: + from hermes_cli.config import cfg_get, load_config + _cfg = load_config() + _vision_cfg = cfg_get(_cfg, "auxiliary", "vision", default={}) + _vt = _vision_cfg.get("timeout") + if _vt is not None: + vision_timeout = max(float(_vt), 180.0) + _vtemp = _vision_cfg.get("temperature") + if _vtemp is not None: + vision_temperature = float(_vtemp) + except Exception: + pass + + call_kwargs = { + "task": "vision", + "messages": messages, + "temperature": vision_temperature, + "max_tokens": 4000, + "timeout": vision_timeout, + } + if model: + call_kwargs["model"] = model + + response = await async_call_llm(**call_kwargs) + analysis = extract_content_or_reasoning(response) + + if not analysis: + logger.warning("Empty video response, retrying once") + response = await async_call_llm(**call_kwargs) + analysis = extract_content_or_reasoning(response) + + analysis_length = len(analysis) if analysis else 0 + logger.info("Video analysis completed (%s characters)", analysis_length) + + result = { + "success": True, + "analysis": analysis or "There was a problem with the request and the video could not be analyzed.", + } + + debug_call_data["success"] = True + debug_call_data["analysis_length"] = analysis_length + _debug.log_call("video_analyze_tool", debug_call_data) + _debug.save() + + return json.dumps(result, indent=2, ensure_ascii=False) + + except Exception as e: + error_msg = f"Error analyzing video: {str(e)}" + logger.error("%s", error_msg, exc_info=True) + + err_str = str(e).lower() + if any(hint in err_str for hint in ( + "402", "insufficient", "payment required", "credits", "billing", + )): + analysis = ( + "Insufficient credits or payment required. Please top up your " + f"API provider account and try again. Error: {e}" + ) + elif any(hint in err_str for hint in ( + "does not support", "not support video", + "content_policy", "multimodal", + "unrecognized request argument", "video input", + "video_url", + )): + analysis = ( + f"The model does not support video analysis or the request was " + f"rejected. Ensure you're using a video-capable model " + f"(e.g. google/gemini-2.5-flash). Error: {e}" + ) + elif any(hint in err_str for hint in ( + "too large", "payload", "413", "content_too_large", + "request_too_large", "exceeds", "size limit", + )): + analysis = ( + "The video is too large for the API. Try compressing or trimming " + f"the video (max ~50 MB). Error: {e}" + ) + else: + analysis = ( + "There was a problem with the request and the video could not " + f"be analyzed. Error: {e}" + ) + + result = { + "success": False, + "error": error_msg, + "analysis": analysis, + } + + debug_call_data["error"] = error_msg + _debug.log_call("video_analyze_tool", debug_call_data) + _debug.save() + + return json.dumps(result, indent=2, ensure_ascii=False) + + finally: + if should_cleanup and temp_video_path and temp_video_path.exists(): + try: + temp_video_path.unlink() + logger.debug("Cleaned up temporary video file") + except Exception as cleanup_error: + logger.warning( + "Could not delete temporary file: %s", cleanup_error, exc_info=True + ) + + +VIDEO_ANALYZE_SCHEMA = { + "name": "video_analyze", + "description": ( + "Analyze a video from a URL or local file path using a multimodal AI model. " + "Sends the video to a video-capable model (e.g. Gemini) for understanding. " + "Use this for video files — for images, use vision_analyze instead. " + "Supports mp4, webm, mov, avi, mkv, mpeg formats. " + "Note: large videos (>20 MB) may be slow; max ~50 MB." + ), + "parameters": { + "type": "object", + "properties": { + "video_url": { + "type": "string", + "description": "Video URL (http/https) or local file path to analyze.", + }, + "question": { + "type": "string", + "description": "Your specific question about the video. The AI will describe what happens in the video and answer your question.", + }, + }, + "required": ["video_url", "question"], + }, +} + + +def _handle_video_analyze(args: Dict[str, Any], **kw: Any) -> Awaitable[str]: + video_url = args.get("video_url", "") + question = args.get("question", "") + full_prompt = ( + "Fully describe and explain everything happening in this video, " + "including visual content, motion, audio cues, text overlays, and scene " + f"transitions. Then answer the following question:\n\n{question}" + ) + model = os.getenv("AUXILIARY_VIDEO_MODEL", "").strip() or os.getenv("AUXILIARY_VISION_MODEL", "").strip() or None + return video_analyze_tool(video_url, full_prompt, model) + + +registry.register( + name="video_analyze", + toolset="video", + schema=VIDEO_ANALYZE_SCHEMA, + handler=_handle_video_analyze, + check_fn=check_vision_requirements, + is_async=True, + emoji="🎬", +) diff --git a/toolsets.py b/toolsets.py index 57e226d3c0..2a77f615ce 100644 --- a/toolsets.py +++ b/toolsets.py @@ -89,6 +89,12 @@ TOOLSETS = { "tools": ["vision_analyze"], "includes": [] }, + + "video": { + "description": "Video analysis and understanding tools (opt-in, not in default toolset)", + "tools": ["video_analyze"], + "includes": [] + }, "image_gen": { "description": "Creative generation tools (images)",