feat: make web search a built-in tool

This commit is contained in:
zhayujie
2026-02-09 11:37:11 +08:00
parent 6c218331b1
commit 4f0ea5d756
7 changed files with 435 additions and 32 deletions

View File

@@ -99,19 +99,24 @@ class Agent:
def get_full_system_prompt(self, skill_filter=None) -> str: def get_full_system_prompt(self, skill_filter=None) -> str:
""" """
Get the full system prompt including skills. Get the full system prompt including skills.
Note: Skills are now built into the system prompt by PromptBuilder, Note: Skills are now built into the system prompt by PromptBuilder,
so we just return the base prompt directly. This method is kept for so we just return the base prompt directly. This method is kept for
backward compatibility. backward compatibility.
:param skill_filter: Optional list of skill names to include (deprecated) :param skill_filter: Optional list of skill names to include (deprecated)
:return: Complete system prompt :return: Complete system prompt
""" """
# Skills are now included in system_prompt by PromptBuilder prompt = self.system_prompt
# Rebuild tool list section to reflect current self.tools
prompt = self._rebuild_tool_list_section(prompt)
# If runtime_info contains dynamic time function, rebuild runtime section # If runtime_info contains dynamic time function, rebuild runtime section
if self.runtime_info and callable(self.runtime_info.get('_get_current_time')): if self.runtime_info and callable(self.runtime_info.get('_get_current_time')):
return self._rebuild_runtime_section(self.system_prompt) prompt = self._rebuild_runtime_section(prompt)
return self.system_prompt
return prompt
def _rebuild_runtime_section(self, prompt: str) -> str: def _rebuild_runtime_section(self, prompt: str) -> str:
""" """
@@ -161,7 +166,31 @@ class Agent:
except Exception as e: except Exception as e:
logger.warning(f"Failed to rebuild runtime section: {e}") logger.warning(f"Failed to rebuild runtime section: {e}")
return prompt return prompt
def _rebuild_tool_list_section(self, prompt: str) -> str:
"""
Rebuild the tool list inside the '## 工具系统' section so that it
always reflects the current ``self.tools`` (handles dynamic add/remove
of conditional tools like web_search).
"""
import re
from agent.prompt.builder import _build_tooling_section
try:
if not self.tools:
return prompt
new_lines = _build_tooling_section(self.tools, "zh")
new_section = "\n".join(new_lines).rstrip("\n")
# Replace existing tooling section
pattern = r'## 工具系统\s*\n.*?(?=\n## |\Z)'
updated = re.sub(pattern, new_section, prompt, count=1, flags=re.DOTALL)
return updated
except Exception as e:
logger.warning(f"Failed to rebuild tool list section: {e}")
return prompt
def refresh_skills(self): def refresh_skills(self):
"""Refresh the loaded skills.""" """Refresh the loaded skills."""
if self.skill_manager: if self.skill_manager:

View File

@@ -45,16 +45,25 @@ def _import_optional_tools():
) )
except Exception as e: except Exception as e:
logger.error(f"[Tools] Scheduler tool failed to load: {e}") logger.error(f"[Tools] Scheduler tool failed to load: {e}")
# WebSearch Tool (conditionally loaded based on API key availability at init time)
try:
from agent.tools.web_search.web_search import WebSearch
tools['WebSearch'] = WebSearch
except ImportError as e:
logger.error(f"[Tools] WebSearch not loaded - missing dependency: {e}")
except Exception as e:
logger.error(f"[Tools] WebSearch failed to load: {e}")
return tools return tools
# Load optional tools # Load optional tools
_optional_tools = _import_optional_tools() _optional_tools = _import_optional_tools()
EnvConfig = _optional_tools.get('EnvConfig') EnvConfig = _optional_tools.get('EnvConfig')
SchedulerTool = _optional_tools.get('SchedulerTool') SchedulerTool = _optional_tools.get('SchedulerTool')
WebSearch = _optional_tools.get('WebSearch')
GoogleSearch = _optional_tools.get('GoogleSearch') GoogleSearch = _optional_tools.get('GoogleSearch')
FileSave = _optional_tools.get('FileSave') FileSave = _optional_tools.get('FileSave')
Terminal = _optional_tools.get('Terminal') Terminal = _optional_tools.get('Terminal')
@@ -92,6 +101,7 @@ __all__ = [
'MemoryGetTool', 'MemoryGetTool',
'EnvConfig', 'EnvConfig',
'SchedulerTool', 'SchedulerTool',
'WebSearch',
# Optional tools (may be None if dependencies not available) # Optional tools (may be None if dependencies not available)
# 'BrowserTool' # 'BrowserTool'
] ]

View File

@@ -20,7 +20,8 @@ class SchedulerTool(BaseTool):
name: str = "scheduler" name: str = "scheduler"
description: str = ( description: str = (
"创建、查询和管理定时任务。支持固定消息和AI任务两种类型\n\n" "创建、查询和管理定时任务(提醒、周期性任务等)\n\n"
"⚠️ 重要:仅当需要「定时/提醒/每天/每周/X分钟后/X点」等延迟或周期执行时才使用此工具。"
"使用方法:\n" "使用方法:\n"
"- 创建action='create', name='任务名', message/ai_task='内容', schedule_type='once/interval/cron', schedule_value='...'\n" "- 创建action='create', name='任务名', message/ai_task='内容', schedule_type='once/interval/cron', schedule_value='...'\n"
"- 查询action='list' / action='get', task_id='任务ID'\n" "- 查询action='list' / action='get', task_id='任务ID'\n"
@@ -53,7 +54,7 @@ class SchedulerTool(BaseTool):
}, },
"ai_task": { "ai_task": {
"type": "string", "type": "string",
"description": "AI任务描述 (与message二选一)'搜索今日新闻''查询天气'" "description": "AI任务描述 (与message二选一)用于定时让AI执行的任务"
}, },
"schedule_type": { "schedule_type": {
"type": "string", "type": "string",

View File

@@ -0,0 +1,3 @@
from agent.tools.web_search.web_search import WebSearch
__all__ = ["WebSearch"]

View File

@@ -0,0 +1,322 @@
"""
Web Search tool - Search the web using Bocha or LinkAI search API.
Supports two backends with unified response format:
1. Bocha Search (primary, requires BOCHA_API_KEY)
2. LinkAI Search (fallback, requires LINKAI_API_KEY)
"""
import os
import json
from typing import Dict, Any, Optional
import requests
from agent.tools.base_tool import BaseTool, ToolResult
from common.log import logger
# Default timeout for API requests (seconds)
DEFAULT_TIMEOUT = 30
class WebSearch(BaseTool):
"""Tool for searching the web using Bocha or LinkAI search API"""
name: str = "web_search"
description: str = (
"Search the web for current information, news, research topics, or any real-time data. "
"Returns web page titles, URLs, snippets, and optional summaries. "
"Use this when the user asks about recent events, needs fact-checking, or wants up-to-date information."
)
params: dict = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query string"
},
"count": {
"type": "integer",
"description": "Number of results to return (1-50, default: 10)"
},
"freshness": {
"type": "string",
"description": (
"Time range filter. Options: "
"'noLimit' (default), 'oneDay', 'oneWeek', 'oneMonth', 'oneYear', "
"or date range like '2025-01-01..2025-02-01'"
)
},
"summary": {
"type": "boolean",
"description": "Whether to include text summary for each result (default: false)"
}
},
"required": ["query"]
}
def __init__(self, config: dict = None):
self.config = config or {}
self._backend = None # Will be resolved on first execute
@staticmethod
def is_available() -> bool:
"""Check if web search is available (at least one API key is configured)"""
return bool(os.environ.get("BOCHA_API_KEY") or os.environ.get("LINKAI_API_KEY"))
def _resolve_backend(self) -> Optional[str]:
"""
Determine which search backend to use.
Priority: Bocha > LinkAI
:return: 'bocha', 'linkai', or None
"""
if os.environ.get("BOCHA_API_KEY"):
return "bocha"
if os.environ.get("LINKAI_API_KEY"):
return "linkai"
return None
def execute(self, args: Dict[str, Any]) -> ToolResult:
"""
Execute web search
:param args: Search parameters (query, count, freshness, summary)
:return: Search results
"""
query = args.get("query", "").strip()
if not query:
return ToolResult.fail("Error: 'query' parameter is required")
count = args.get("count", 10)
freshness = args.get("freshness", "noLimit")
summary = args.get("summary", False)
# Validate count
if not isinstance(count, int) or count < 1 or count > 50:
count = 10
# Resolve backend
backend = self._resolve_backend()
if not backend:
return ToolResult.fail(
"Error: No search API key configured. "
"Please set BOCHA_API_KEY or LINKAI_API_KEY using env_config tool.\n"
" - Bocha Search: https://open.bocha.cn\n"
" - LinkAI Search: https://link-ai.tech"
)
try:
if backend == "bocha":
return self._search_bocha(query, count, freshness, summary)
else:
return self._search_linkai(query, count, freshness)
except requests.Timeout:
return ToolResult.fail(f"Error: Search request timed out after {DEFAULT_TIMEOUT}s")
except requests.ConnectionError:
return ToolResult.fail("Error: Failed to connect to search API")
except Exception as e:
logger.error(f"[WebSearch] Unexpected error: {e}", exc_info=True)
return ToolResult.fail(f"Error: Search failed - {str(e)}")
def _search_bocha(self, query: str, count: int, freshness: str, summary: bool) -> ToolResult:
"""
Search using Bocha API
:param query: Search query
:param count: Number of results
:param freshness: Time range filter
:param summary: Whether to include summary
:return: Formatted search results
"""
api_key = os.environ.get("BOCHA_API_KEY", "")
url = "https://api.bocha.cn/v1/web-search"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "application/json"
}
payload = {
"query": query,
"count": count,
"freshness": freshness,
"summary": summary
}
logger.debug(f"[WebSearch] Bocha search: query='{query}', count={count}")
response = requests.post(url, headers=headers, json=payload, timeout=DEFAULT_TIMEOUT)
if response.status_code == 401:
return ToolResult.fail("Error: Invalid BOCHA_API_KEY. Please check your API key.")
if response.status_code == 403:
return ToolResult.fail("Error: Bocha API - insufficient balance. Please top up at https://open.bocha.cn")
if response.status_code == 429:
return ToolResult.fail("Error: Bocha API rate limit reached. Please try again later.")
if response.status_code != 200:
return ToolResult.fail(f"Error: Bocha API returned HTTP {response.status_code}")
data = response.json()
# Check API-level error code
api_code = data.get("code")
if api_code is not None and api_code != 200:
msg = data.get("msg") or "Unknown error"
return ToolResult.fail(f"Error: Bocha API error (code={api_code}): {msg}")
# Extract and format results
return self._format_bocha_results(data, query)
def _format_bocha_results(self, data: dict, query: str) -> ToolResult:
"""
Format Bocha API response into unified result structure
:param data: Raw API response
:param query: Original query
:return: Formatted ToolResult
"""
search_data = data.get("data", {})
web_pages = search_data.get("webPages", {})
pages = web_pages.get("value", [])
if not pages:
return ToolResult.success({
"query": query,
"backend": "bocha",
"total": 0,
"results": [],
"message": "No results found"
})
results = []
for page in pages:
result = {
"title": page.get("name", ""),
"url": page.get("url", ""),
"snippet": page.get("snippet", ""),
"siteName": page.get("siteName", ""),
"datePublished": page.get("datePublished") or page.get("dateLastCrawled", ""),
}
# Include summary only if present
if page.get("summary"):
result["summary"] = page["summary"]
results.append(result)
total = web_pages.get("totalEstimatedMatches", len(results))
return ToolResult.success({
"query": query,
"backend": "bocha",
"total": total,
"count": len(results),
"results": results
})
def _search_linkai(self, query: str, count: int, freshness: str) -> ToolResult:
"""
Search using LinkAI plugin API
:param query: Search query
:param count: Number of results
:param freshness: Time range filter
:return: Formatted search results
"""
api_key = os.environ.get("LINKAI_API_KEY", "")
url = "https://api.link-ai.tech/v1/plugin/execute"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
payload = {
"code": "web-search",
"args": {
"query": query,
"count": count,
"freshness": freshness
}
}
logger.debug(f"[WebSearch] LinkAI search: query='{query}', count={count}")
response = requests.post(url, headers=headers, json=payload, timeout=DEFAULT_TIMEOUT)
if response.status_code == 401:
return ToolResult.fail("Error: Invalid LINKAI_API_KEY. Please check your API key.")
if response.status_code != 200:
return ToolResult.fail(f"Error: LinkAI API returned HTTP {response.status_code}")
data = response.json()
if not data.get("success"):
msg = data.get("message") or "Unknown error"
return ToolResult.fail(f"Error: LinkAI search failed: {msg}")
return self._format_linkai_results(data, query)
def _format_linkai_results(self, data: dict, query: str) -> ToolResult:
"""
Format LinkAI API response into unified result structure.
LinkAI returns the search data in data.data field, which follows
the same Bing-compatible format as Bocha.
:param data: Raw API response
:param query: Original query
:return: Formatted ToolResult
"""
raw_data = data.get("data", "")
# LinkAI may return data as a JSON string
if isinstance(raw_data, str):
try:
raw_data = json.loads(raw_data)
except (json.JSONDecodeError, TypeError):
# If data is plain text, return it as a single result
return ToolResult.success({
"query": query,
"backend": "linkai",
"total": 1,
"count": 1,
"results": [{"content": raw_data}]
})
# If the response follows Bing-compatible structure
if isinstance(raw_data, dict):
web_pages = raw_data.get("webPages", {})
pages = web_pages.get("value", [])
if pages:
results = []
for page in pages:
result = {
"title": page.get("name", ""),
"url": page.get("url", ""),
"snippet": page.get("snippet", ""),
"siteName": page.get("siteName", ""),
"datePublished": page.get("datePublished") or page.get("dateLastCrawled", ""),
}
if page.get("summary"):
result["summary"] = page["summary"]
results.append(result)
total = web_pages.get("totalEstimatedMatches", len(results))
return ToolResult.success({
"query": query,
"backend": "linkai",
"total": total,
"count": len(results),
"results": results
})
# Fallback: return raw data
return ToolResult.success({
"query": query,
"backend": "linkai",
"total": 1,
"count": 1,
"results": [{"content": str(raw_data)}]
})

View File

@@ -494,39 +494,70 @@ class AgentBridge:
def refresh_all_skills(self) -> int: def refresh_all_skills(self) -> int:
""" """
Refresh skills in all agent instances after environment variable changes. Refresh skills and conditional tools in all agent instances after
This allows hot-reload of skills without restarting the agent. environment variable changes. This allows hot-reload without restarting.
Returns: Returns:
Number of agent instances refreshed Number of agent instances refreshed
""" """
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
from config import conf from config import conf
# Reload environment variables from .env file # Reload environment variables from .env file
workspace_root = expand_path(conf().get("agent_workspace", "~/cow")) workspace_root = expand_path(conf().get("agent_workspace", "~/cow"))
env_file = os.path.join(workspace_root, '.env') env_file = os.path.join(workspace_root, '.env')
if os.path.exists(env_file): if os.path.exists(env_file):
load_dotenv(env_file, override=True) load_dotenv(env_file, override=True)
logger.info(f"[AgentBridge] Reloaded environment variables from {env_file}") logger.info(f"[AgentBridge] Reloaded environment variables from {env_file}")
refreshed_count = 0 refreshed_count = 0
# Refresh default agent # Collect all agent instances to refresh
if self.default_agent and hasattr(self.default_agent, 'skill_manager'): agents_to_refresh = []
self.default_agent.skill_manager.refresh_skills() if self.default_agent:
refreshed_count += 1 agents_to_refresh.append(("default", self.default_agent))
logger.info("[AgentBridge] Refreshed skills in default agent")
# Refresh all session agents
for session_id, agent in self.agents.items(): for session_id, agent in self.agents.items():
if hasattr(agent, 'skill_manager'): agents_to_refresh.append((session_id, agent))
for label, agent in agents_to_refresh:
# Refresh skills
if hasattr(agent, 'skill_manager') and agent.skill_manager:
agent.skill_manager.refresh_skills() agent.skill_manager.refresh_skills()
refreshed_count += 1
# Refresh conditional tools (e.g. web_search depends on API keys)
self._refresh_conditional_tools(agent)
refreshed_count += 1
if refreshed_count > 0: if refreshed_count > 0:
logger.info(f"[AgentBridge] Refreshed skills in {refreshed_count} agent instance(s)") logger.info(f"[AgentBridge] Refreshed skills & tools in {refreshed_count} agent instance(s)")
return refreshed_count return refreshed_count
@staticmethod
def _refresh_conditional_tools(agent):
"""
Add or remove conditional tools based on current environment variables.
For example, web_search should only be present when BOCHA_API_KEY or
LINKAI_API_KEY is set.
"""
try:
from agent.tools.web_search.web_search import WebSearch
has_tool = any(t.name == "web_search" for t in agent.tools)
available = WebSearch.is_available()
if available and not has_tool:
# API key was added - inject the tool
tool = WebSearch()
tool.model = agent.model
agent.tools.append(tool)
logger.info("[AgentBridge] web_search tool added (API key now available)")
elif not available and has_tool:
# API key was removed - remove the tool
agent.tools = [t for t in agent.tools if t.name != "web_search"]
logger.info("[AgentBridge] web_search tool removed (API key no longer available)")
except Exception as e:
logger.debug(f"[AgentBridge] Failed to refresh conditional tools: {e}")

View File

@@ -219,13 +219,20 @@ class AgentInitializer:
for tool_name in tool_manager.tool_classes.keys(): for tool_name in tool_manager.tool_classes.keys():
try: try:
# Skip web_search if no API key is available
if tool_name == "web_search":
from agent.tools.web_search.web_search import WebSearch
if not WebSearch.is_available():
logger.debug("[AgentInitializer] WebSearch skipped - no BOCHA_API_KEY or LINKAI_API_KEY")
continue
# Special handling for EnvConfig tool # Special handling for EnvConfig tool
if tool_name == "env_config": if tool_name == "env_config":
from agent.tools import EnvConfig from agent.tools import EnvConfig
tool = EnvConfig({"agent_bridge": self.agent_bridge}) tool = EnvConfig({"agent_bridge": self.agent_bridge})
else: else:
tool = tool_manager.create_tool(tool_name) tool = tool_manager.create_tool(tool_name)
if tool: if tool:
# Apply workspace config to file operation tools # Apply workspace config to file operation tools
if tool_name in ['read', 'write', 'edit', 'bash', 'grep', 'find', 'ls']: if tool_name in ['read', 'write', 'edit', 'bash', 'grep', 'find', 'ls']: