Files
chatgpt-on-wechat/agent/tools/vision/vision.py
zhayujie 26693acc3f feat(vision): prioritize main model for image recognition with multi-provider fallback
- Add call_vision method to all bot implementations (DashScope, Claude,
  Gemini, ZhipuAI, MiniMax, Doubao, Moonshot, OpenAICompatibleBot)
  using each vendor's native multimodal API format
- Remove call_with_tools/call_vision from Bot base class to fix MRO
  shadowing issue with OpenAICompatibleBot mixin
- Refactor vision tool provider resolution: MainModel → other configured
  models (auto-discovered) → OpenAI → LinkAI, with automatic fallback
- Return actual model name used in call_vision responses
- Sync config.json API keys to .env bidirectionally on startup
- Fix bot instance cache to detect bot_type/use_linkai config changes
- Add SSE reconnection support for web console
- Preserve image path hints in Gemini text for correct vision tool calls
- Update docs/tools/vision.mdx
2026-04-11 19:46:11 +08:00

513 lines
20 KiB
Python

"""
Vision tool - Analyze images using Vision API.
Supports local files (auto base64-encoded) and HTTP URLs.
Provider priority (default):
1. Main model via bot.call_vision — zero extra cost
2. Other models whose API key is configured — auto-discovered
3. OpenAI / LinkAI raw HTTP — reliable fallback
When use_linkai=true, LinkAI is promoted to #1.
When tool.vision.model is set, that model is used exclusively first.
"""
import base64
import os
import subprocess
import tempfile
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
import requests
from agent.tools.base_tool import BaseTool, ToolResult
from common import const
from common.log import logger
from config import conf
DEFAULT_MODEL = const.GPT_41_MINI
DEFAULT_TIMEOUT = 60
MAX_TOKENS = 1000
COMPRESS_THRESHOLD = 1_048_576 # 1 MB
SUPPORTED_EXTENSIONS = {
"jpg": "image/jpeg",
"jpeg": "image/jpeg",
"png": "image/png",
"gif": "image/gif",
"webp": "image/webp",
}
_MAIN_MODEL_PROVIDER_NAME = "MainModel"
# (config_key_for_api_key, bot_type, default_vision_model, provider_display_name)
# Auto-discovered as fallback vision providers when their API key is configured.
# OpenAI and LinkAI are handled separately (raw HTTP providers), so not listed here.
_DISCOVERABLE_MODELS = [
("moonshot_api_key", const.MOONSHOT, const.KIMI_K2_5, "Moonshot"),
("ark_api_key", const.DOUBAO, const.DOUBAO_SEED_2_PRO, "Doubao"),
("dashscope_api_key", const.QWEN_DASHSCOPE, const.QWEN36_PLUS, "DashScope"),
("claude_api_key", const.CLAUDEAPI, const.CLAUDE_4_6_SONNET, "Claude"),
("gemini_api_key", const.GEMINI, const.GEMINI_31_FLASH_LITE_PRE, "Gemini"),
("zhipu_ai_api_key", const.ZHIPU_AI, const.GLM_4_7, "ZhipuAI"),
("minimax_api_key", const.MiniMax, const.MINIMAX_M2_7, "MiniMax"),
]
@dataclass
class VisionProvider:
"""A single Vision API provider configuration."""
name: str
api_key: str
api_base: str
extra_headers: dict = field(default_factory=dict)
model_override: Optional[str] = None
use_bot: bool = False # When True, call via bot.call_vision instead of raw HTTP
fallback_bot: Any = None # Bot instance for non-main-model providers
class VisionAPIError(Exception):
"""Raised when a Vision API call fails and should trigger fallback."""
pass
class Vision(BaseTool):
"""Analyze images using Vision API"""
name: str = "vision"
description: str = (
"Analyze a local image or image URL (jpg/jpeg/png) using Vision API. "
"Can describe content, extract text, identify objects, colors, etc. "
)
params: dict = {
"type": "object",
"properties": {
"image": {
"type": "string",
"description": "Local file path or HTTP(S) URL of the image to analyze",
},
"question": {
"type": "string",
"description": "Question to ask about the image",
},
},
"required": ["image", "question"],
}
def __init__(self, config: dict = None):
self.config = config or {}
@staticmethod
def is_available() -> bool:
return True
def execute(self, args: Dict[str, Any]) -> ToolResult:
image = args.get("image", "").strip()
question = args.get("question", "").strip()
if not image:
return ToolResult.fail("Error: 'image' parameter is required")
if not question:
return ToolResult.fail("Error: 'question' parameter is required")
providers = self._resolve_providers()
if not providers:
return ToolResult.fail(
"Error: No model available for Vision.\n"
"The main model does not support vision and no other API keys are configured.\n"
"Options:\n"
" 1. Switch to a multimodal model (e.g. qwen3.6-plus, claude-sonnet-4-6, gemini-2.0-flash)\n"
" 2. Configure OPENAI_API_KEY: env_config(action=\"set\", key=\"OPENAI_API_KEY\", value=\"your-key\")\n"
" 3. Configure LINKAI_API_KEY: env_config(action=\"set\", key=\"LINKAI_API_KEY\", value=\"your-key\")"
)
try:
image_content = self._build_image_content(image)
except Exception as e:
return ToolResult.fail(f"Error: {e}")
return self._call_with_fallback(providers, DEFAULT_MODEL, question, image_content)
def _call_with_fallback(self, providers: List[VisionProvider], model: str,
question: str, image_content: dict) -> ToolResult:
"""Try each provider in order; fall back to the next one on failure."""
errors: List[str] = []
for i, provider in enumerate(providers):
use_model = provider.model_override or model
try:
logger.info(f"[Vision] Trying provider '{provider.name}' "
f"with model '{use_model}' ({i + 1}/{len(providers)})")
if provider.use_bot:
result = self._call_via_bot(use_model, question, image_content, provider)
else:
result = self._call_api(provider, use_model, question, image_content)
logger.info(f"[Vision] ✅ Success via {provider.name} (model={use_model})")
return result
except VisionAPIError as e:
errors.append(f"[{provider.name}/{use_model}] {e}")
logger.warning(f"[Vision] Provider '{provider.name}' failed: {e}")
except requests.Timeout:
errors.append(f"[{provider.name}/{use_model}] Request timed out after {DEFAULT_TIMEOUT}s")
logger.warning(f"[Vision] Provider '{provider.name}' timed out")
except requests.ConnectionError:
errors.append(f"[{provider.name}/{use_model}] Connection failed")
logger.warning(f"[Vision] Provider '{provider.name}' connection failed")
except Exception as e:
errors.append(f"[{provider.name}/{use_model}] {e}")
logger.error(f"[Vision] Provider '{provider.name}' unexpected error: {e}", exc_info=True)
return ToolResult.fail(
"Error: All Vision API providers failed.\n" + "\n".join(f" - {err}" for err in errors)
)
def _resolve_providers(self) -> List[VisionProvider]:
"""
Build an ordered list of available providers.
Priority:
- use_linkai=true → [LinkAI, MainModel, OtherModels…, OpenAI]
- default → [MainModel, OtherModels…, OpenAI, LinkAI]
"OtherModels" are auto-discovered from configured API keys.
The main model's bot_type is excluded from OtherModels to avoid
duplicating the MainModel provider.
"""
use_linkai = conf().get("use_linkai", False) and conf().get("linkai_api_key")
providers: List[VisionProvider] = []
if use_linkai:
self._append_provider(providers, self._build_linkai_provider)
self._append_provider(providers, self._build_main_model_provider)
self._append_other_model_providers(providers)
self._append_provider(providers, self._build_openai_provider)
else:
self._append_provider(providers, self._build_main_model_provider)
self._append_other_model_providers(providers)
self._append_provider(providers, self._build_openai_provider)
self._append_provider(providers, self._build_linkai_provider)
return providers
@staticmethod
def _append_provider(providers: List[VisionProvider], builder) -> None:
p = builder()
if p:
providers.append(p)
def _append_other_model_providers(self, providers: List[VisionProvider]) -> None:
"""
Auto-discover other models whose API key is configured.
Skip the main model's own bot_type (already covered by MainModel provider).
Skip bot_types that already have a provider in the list (e.g. OpenAI).
"""
# Determine main model's bot_type so we can skip it
main_bot_type = None
if self.model and hasattr(self.model, '_resolve_bot_type'):
main_bot_type = self.model._resolve_bot_type(conf().get("model", ""))
existing_names = {p.name for p in providers}
for config_key, bot_type, default_model, display_name in _DISCOVERABLE_MODELS:
if display_name in existing_names:
continue
if bot_type == main_bot_type:
continue
api_key = conf().get(config_key, "")
if not api_key or not api_key.strip():
continue
# Create a bot instance and check if it supports call_vision
try:
from models.bot_factory import create_bot
bot = create_bot(bot_type)
if not hasattr(bot, 'call_vision'):
continue
except Exception:
continue
providers.append(VisionProvider(
name=display_name,
api_key="",
api_base="",
model_override=default_model,
use_bot=True,
fallback_bot=bot,
))
def _resolve_vision_model(self) -> Optional[str]:
"""
Determine which model to use for vision.
1. User explicit config: tool.vision.model in config.json
2. Fallback to the main configured model name
"""
tool_conf = conf().get("tool", {})
user_vision_model = tool_conf.get("vision", {}).get("model") if isinstance(tool_conf, dict) else None
if user_vision_model:
return user_vision_model
model_name = conf().get("model", "")
return model_name or None
def _build_main_model_provider(self) -> Optional[VisionProvider]:
"""
Use the vendor's own model for vision via bot.call_vision.
Only available when the bot class has call_vision.
"""
if not (self.model and hasattr(self.model, 'bot')):
return None
try:
bot = self.model.bot
if not hasattr(bot, 'call_vision'):
return None
except Exception:
return None
vision_model = self._resolve_vision_model()
return VisionProvider(
name=_MAIN_MODEL_PROVIDER_NAME,
api_key="",
api_base="",
model_override=vision_model,
use_bot=True,
)
def _build_openai_provider(self) -> Optional[VisionProvider]:
api_key = conf().get("open_ai_api_key") or os.environ.get("OPENAI_API_KEY")
if not api_key:
return None
api_base = (conf().get("open_ai_api_base") or os.environ.get("OPENAI_API_BASE", "")).rstrip("/") \
or "https://api.openai.com/v1"
return VisionProvider(name="OpenAI", api_key=api_key, api_base=self._ensure_v1(api_base))
def _build_linkai_provider(self) -> Optional[VisionProvider]:
api_key = conf().get("linkai_api_key") or os.environ.get("LINKAI_API_KEY")
if not api_key:
return None
api_base = (conf().get("linkai_api_base") or os.environ.get("LINKAI_API_BASE", "")).rstrip("/") \
or "https://api.link-ai.tech"
from common.utils import get_cloud_headers
extra = get_cloud_headers(api_key)
extra.pop("Authorization", None)
extra.pop("Content-Type", None)
return VisionProvider(name="LinkAI", api_key=api_key, api_base=self._ensure_v1(api_base),
extra_headers=extra)
def _call_via_bot(self, model: str, question: str, image_content: dict,
provider: Optional[VisionProvider] = None) -> ToolResult:
"""
Call a model's call_vision with vendor-native API format.
Uses the provider's _fallback_bot if set, otherwise the main model bot.
Raises VisionAPIError on failure so fallback can proceed.
"""
try:
bot = (provider and provider.fallback_bot) or self.model.bot
except Exception as e:
raise VisionAPIError(f"Cannot access bot: {e}")
# Extract the raw image URL from the OpenAI-format image_content block
image_url = image_content.get("image_url", {}).get("url", "")
if not image_url:
raise VisionAPIError("No image URL in content block")
try:
response = bot.call_vision(
image_url=image_url,
question=question,
model=model,
max_tokens=MAX_TOKENS,
)
except Exception as e:
raise VisionAPIError(f"call_vision failed: {e}")
if response is NotImplemented:
raise VisionAPIError("Bot does not support vision")
if isinstance(response, dict) and response.get("error"):
raise VisionAPIError(f"API error - {response.get('message', 'Unknown')}")
content = response.get("content", "") if isinstance(response, dict) else ""
if not content:
raise VisionAPIError("Empty response from main model")
usage_info = response.get("usage", {}) if isinstance(response, dict) else {}
# Use the actual model name from the bot response if available
actual_model = response.get("model", model) if isinstance(response, dict) else model
provider_name = provider.name if provider else _MAIN_MODEL_PROVIDER_NAME
return ToolResult.success({
"model": actual_model,
"provider": provider_name,
"content": content,
"usage": usage_info,
})
@staticmethod
def _ensure_v1(api_base: str) -> str:
"""Append /v1 if the base URL doesn't already end with a versioned path."""
if not api_base:
return api_base
# Already has /v1 or similar version suffix
if api_base.rstrip("/").split("/")[-1].startswith("v"):
return api_base
return api_base.rstrip("/") + "/v1"
def _build_image_content(self, image: str) -> dict:
"""
Build the image_url content block.
Both remote URLs and local files are converted to base64 data URLs
so every bot backend can consume them without extra downloads.
"""
if image.startswith(("http://", "https://")):
return self._download_to_data_url(image)
if not os.path.isfile(image):
raise FileNotFoundError(f"Image file not found: {image}")
ext = image.rsplit(".", 1)[-1].lower() if "." in image else ""
mime_type = SUPPORTED_EXTENSIONS.get(ext)
if not mime_type:
raise ValueError(
f"Unsupported image format '.{ext}'. "
f"Supported: {', '.join(SUPPORTED_EXTENSIONS.keys())}"
)
file_path = self._maybe_compress(image)
try:
with open(file_path, "rb") as f:
b64 = base64.b64encode(f.read()).decode("ascii")
finally:
if file_path != image and os.path.exists(file_path):
os.remove(file_path)
data_url = f"data:{mime_type};base64,{b64}"
return {"type": "image_url", "image_url": {"url": data_url}}
@staticmethod
def _download_to_data_url(url: str) -> dict:
"""Download a remote image and return it as a base64 data URL."""
resp = requests.get(url, timeout=30)
if resp.status_code != 200:
raise VisionAPIError(f"Failed to download image: HTTP {resp.status_code}")
content_type = resp.headers.get("Content-Type", "image/jpeg").split(";")[0].strip()
if not content_type.startswith("image/"):
content_type = "image/jpeg"
b64 = base64.b64encode(resp.content).decode("ascii")
data_url = f"data:{content_type};base64,{b64}"
return {"type": "image_url", "image_url": {"url": data_url}}
@staticmethod
def _maybe_compress(path: str) -> str:
"""Compress image to under COMPRESS_THRESHOLD with max long-edge 1536px."""
file_size = os.path.getsize(path)
if file_size <= COMPRESS_THRESHOLD:
return path
tmp = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False)
tmp.close()
def _try_sips(max_dim: str, quality: str) -> bool:
try:
subprocess.run(
["sips", "-Z", max_dim, "-s", "formatOptions", quality,
path, "--out", tmp.name],
capture_output=True, check=True,
)
return True
except (FileNotFoundError, subprocess.CalledProcessError):
return False
def _try_convert(max_dim: str, quality: str) -> bool:
try:
subprocess.run(
["convert", path, "-resize", f"{max_dim}x{max_dim}>",
"-quality", quality, tmp.name],
capture_output=True, check=True,
)
return True
except (FileNotFoundError, subprocess.CalledProcessError):
return False
attempts = [
("1536", "85"),
("1536", "70"),
("1536", "50"),
]
for max_dim, quality in attempts:
ok = _try_sips(max_dim, quality) or _try_convert(max_dim, quality)
if not ok:
continue
new_size = os.path.getsize(tmp.name)
logger.debug(f"[Vision] Compressed image "
f"({file_size // 1024}KB -> {new_size // 1024}KB, "
f"max_dim={max_dim}, q={quality})")
if new_size <= COMPRESS_THRESHOLD:
return tmp.name
if os.path.exists(tmp.name) and os.path.getsize(tmp.name) > 0:
return tmp.name
os.remove(tmp.name)
return path
def _call_api(self, provider: VisionProvider, model: str,
question: str, image_content: dict) -> ToolResult:
"""
Call a single provider's Vision API.
Raises VisionAPIError on recoverable failures so the caller can try
the next provider.
"""
payload = {
"model": model,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": question},
image_content,
],
}
],
}
headers = {
"Authorization": f"Bearer {provider.api_key}",
"Content-Type": "application/json",
**provider.extra_headers,
}
resp = requests.post(
f"{provider.api_base}/chat/completions",
headers=headers,
json=payload,
timeout=DEFAULT_TIMEOUT,
)
if resp.status_code != 200:
raise VisionAPIError(f"HTTP {resp.status_code}: {resp.text[:200]}")
data = resp.json()
if "error" in data:
msg = data["error"].get("message", "Unknown API error")
raise VisionAPIError(f"API error - {msg}")
content = ""
choices = data.get("choices", [])
if choices:
content = choices[0].get("message", {}).get("content", "")
usage = data.get("usage", {})
result = {
"model": model,
"provider": provider.name,
"content": content,
"usage": {
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
}
return ToolResult.success(result)