From 307769b9493866d2a3a0a7bea2ff1c667dc162da Mon Sep 17 00:00:00 2001 From: zhayujie Date: Fri, 8 May 2026 15:22:42 +0800 Subject: [PATCH] feat(mcp): load MCP servers asynchronously at startup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Boot MCP servers (npx/uvx) on a background thread instead of blocking agent init. Built-in tools serve traffic immediately while MCP comes online; each new agent reads whatever is ready at creation time. Idempotent via _mcp_loaded flag — concurrent sessions never re-fork subprocesses. Per-server failures are isolated and warmup is triggered in app.py so loading overlaps with channel startup. --- agent/tools/tool_manager.py | 114 +++++++++++++++++++++++++++---- app.py | 18 +++++ bridge/agent_initializer.py | 16 +++-- channel/web/static/js/console.js | 4 +- config.py | 8 +-- 5 files changed, 133 insertions(+), 27 deletions(-) diff --git a/agent/tools/tool_manager.py b/agent/tools/tool_manager.py index f0b3cc09..f20c5f9e 100644 --- a/agent/tools/tool_manager.py +++ b/agent/tools/tool_manager.py @@ -1,5 +1,6 @@ import importlib import importlib.util +import threading from pathlib import Path from typing import Dict, Any, Type from agent.tools.base_tool import BaseTool @@ -46,9 +47,23 @@ class ToolManager: if not hasattr(self, 'tool_classes'): self.tool_classes = {} # Dictionary to store tool classes if not hasattr(self, '_mcp_registry'): - self._mcp_registry = None # 懒初始化,有配置时才创建 + self._mcp_registry = None # Lazy init: only created when MCP servers are configured if not hasattr(self, '_mcp_tool_instances'): self._mcp_tool_instances: dict = {} # tool_name -> McpTool instance + if not hasattr(self, '_mcp_lock'): + # Guards _mcp_loaded check-then-set so concurrent callers + # don't trigger duplicate background loaders. + self._mcp_lock = threading.Lock() + if not hasattr(self, '_mcp_loaded'): + # Idempotency flag. Flipped to True the moment the first loader + # is dispatched (synchronously, inside _mcp_lock). Subsequent + # _load_mcp_tools() calls become no-ops, so per-session agent + # initialization never re-forks MCP subprocesses. + self._mcp_loaded = False + if not hasattr(self, '_mcp_status'): + # server_name -> "pending" / "ready" / "failed" + # Useful for UI / introspection while async loading is in progress. + self._mcp_status: dict = {} def load_tools(self, tools_dir: str = "", config_dict=None): """ @@ -268,34 +283,109 @@ class ToolManager: return _normalize_mcp_configs(raw) def _load_mcp_tools(self): - """Load MCP tools from mcp_servers config. Failures are non-fatal.""" - try: + """ + Trigger MCP tool loading in a background thread (idempotent). + + Returns immediately. Booting MCP servers (npx, uvx, etc.) takes + seconds to tens of seconds on first run, which would otherwise + block agent initialization and the user's first message. + Built-in tools work fine without MCP, so we let the agent serve + traffic right away and let MCP servers come online in the + background. Per-session agents read a snapshot of whatever is + ready at construction time and gracefully ignore the rest. + """ + with self._mcp_lock: + if self._mcp_loaded: + return mcp_servers_config = self._load_mcp_configs() if not mcp_servers_config: + # Mark as loaded even when there is nothing to load, + # so we don't re-read the config file on every call. + self._mcp_loaded = True return - from agent.tools.mcp.mcp_client import McpClientRegistry + # Mark pending immediately so list_mcp_status() callers see + # the in-progress state instead of an empty dict. + for cfg in mcp_servers_config: + name = cfg.get("name", "") + self._mcp_status[name] = "pending" + + self._mcp_loaded = True + threading.Thread( + target=self._load_mcp_tools_async, + args=(mcp_servers_config,), + daemon=True, + name="mcp-loader", + ).start() + logger.info( + f"[ToolManager] MCP loading started in background " + f"({len(mcp_servers_config)} server(s) configured)" + ) + + def _load_mcp_tools_async(self, mcp_servers_config): + """ + Background worker: bring up each MCP server one-by-one and + publish ready tools to _mcp_tool_instances as they come online. + + Server failures are isolated — one bad server cannot block + the others, and never raises out of the worker thread. + """ + try: + from agent.tools.mcp.mcp_client import McpClient, McpClientRegistry from agent.tools.mcp.mcp_tool import McpTool - self._mcp_registry = McpClientRegistry() - self._mcp_registry.start_all(mcp_servers_config) + registry = McpClientRegistry() + self._mcp_registry = registry - for server_name, client in self._mcp_registry.all_clients().items(): + for cfg in mcp_servers_config: + server_name = cfg.get("name", "") try: + client = McpClient(cfg) + if not client.initialize(): + self._mcp_status[server_name] = "failed" + logger.warning( + f"[MCP] Server '{server_name}' failed to initialize — skipping" + ) + continue + tool_schemas = client.list_tools() + added = [] for schema in tool_schemas: tool_name = schema.get("name", "") if not tool_name: continue mcp_tool = McpTool(client, schema, server_name) + # Atomic dict assignment is GIL-safe; readers iterate + # over a list() snapshot to avoid concurrent mutation. self._mcp_tool_instances[tool_name] = mcp_tool - logger.debug(f"[ToolManager] Loaded MCP tool: {tool_name} from server '{server_name}'") - except Exception as e: - logger.warning(f"[ToolManager] Failed to list tools from MCP server '{server_name}': {e}") + added.append(tool_name) - logger.info(f"[ToolManager] Loaded {len(self._mcp_tool_instances)} MCP tool(s) in total") + # Register client into the shared registry only after its + # tools are visible, so callers never see a half-loaded server. + with registry._registry_lock: + registry._clients[server_name] = client + self._mcp_status[server_name] = "ready" + logger.info( + f"[MCP] Server '{server_name}' ready — " + f"{len(added)} tool(s): {added}" + ) + except Exception as e: + self._mcp_status[server_name] = "failed" + logger.warning(f"[MCP] Server '{server_name}' load failed: {e}") + + ready = sum(1 for s in self._mcp_status.values() if s == "ready") + total = len(mcp_servers_config) + logger.info( + f"[ToolManager] MCP loading complete: " + f"{ready}/{total} server(s) ready, " + f"{len(self._mcp_tool_instances)} tool(s) available" + ) except Exception as e: - logger.warning(f"[ToolManager] MCP tool loading failed, skipping: {e}") + logger.warning(f"[ToolManager] MCP background loader crashed: {e}") + + def list_mcp_status(self) -> dict: + """Return {server_name: status} snapshot for UI / debugging.""" + return dict(self._mcp_status) def create_tool(self, name: str) -> BaseTool: """ diff --git a/app.py b/app.py index 4503ac67..c442e071 100644 --- a/app.py +++ b/app.py @@ -274,6 +274,20 @@ def sigterm_handler_wrap(_signo): signal.signal(_signo, func) +def _warmup_mcp_tools(): + """ + Kick off MCP server loading at process startup so subprocesses + (npx / uvx etc.) finish initializing before the first user message + arrives. Returns immediately — the actual work happens on a daemon + thread inside ToolManager. Safe to call when MCP is not configured. + """ + try: + from agent.tools import ToolManager + ToolManager()._load_mcp_tools() + except Exception as e: + logger.warning(f"[App] MCP warmup failed (non-fatal): {e}") + + def _sync_builtin_skills(): """Sync builtin skills from project skills/ to workspace skills/ on startup.""" import shutil @@ -335,6 +349,10 @@ def run(): # Sync builtin skills to workspace before channels start _sync_builtin_skills() + # Kick off MCP server loading in the background so first-message + # latency isn't dominated by npx package downloads. + _warmup_mcp_tools() + logger.info(f"[App] Starting channels: {channel_names}") _channel_mgr = ChannelManager() diff --git a/bridge/agent_initializer.py b/bridge/agent_initializer.py index d580a495..78f01e51 100644 --- a/bridge/agent_initializer.py +++ b/bridge/agent_initializer.py @@ -383,12 +383,16 @@ class AgentInitializer: except Exception as e: logger.warning(f"[AgentInitializer] Failed to load tool {tool_name}: {e}") - # Add MCP tools - for mcp_tool in tool_manager._mcp_tool_instances.values(): - tools.append(mcp_tool) - if tool_manager._mcp_tool_instances and session_id is None: - logger.info(f"[AgentInitializer] Added {len(tool_manager._mcp_tool_instances)} MCP tool(s): " - f"{list(tool_manager._mcp_tool_instances.keys())}") + # Add MCP tools (snapshot to avoid races with the background loader) + mcp_tools_snapshot = list(tool_manager._mcp_tool_instances.items()) + if mcp_tools_snapshot: + for _, mcp_tool in mcp_tools_snapshot: + tools.append(mcp_tool) + if session_id is None: + names = [name for name, _ in mcp_tools_snapshot] + logger.info( + f"[AgentInitializer] Added {len(names)} MCP tool(s): {names}" + ) # Add memory tools if memory_tools: diff --git a/channel/web/static/js/console.js b/channel/web/static/js/console.js index fe0a3104..44ec1467 100644 --- a/channel/web/static/js/console.js +++ b/channel/web/static/js/console.js @@ -50,7 +50,7 @@ const I18N = { config_password_hint: '留空则不启用密码保护', config_password_changed: '密码已更新,请重新登录', config_password_cleared: '密码已清除', - skills_title: '技能管理', skills_desc: '查看、启用或禁用 Agent 技能', skills_hub_btn: '探索技能广场', + skills_title: '技能管理', skills_desc: '查看、启用或禁用 Agent 工具和技能', skills_hub_btn: '探索技能广场', skills_loading: '加载技能中...', skills_loading_desc: '技能加载后将显示在此处', tools_section_title: '内置工具', tools_loading: '加载工具中...', skills_section_title: '技能', skill_enable: '启用', skill_disable: '禁用', @@ -149,7 +149,7 @@ const I18N = { config_password_hint: 'Leave empty to disable password protection', config_password_changed: 'Password updated, please re-login', config_password_cleared: 'Password cleared', - skills_title: 'Skills', skills_desc: 'View, enable, or disable agent skills', skills_hub_btn: 'Skill Hub', + skills_title: 'Skills', skills_desc: 'View, enable, or disable agent tools and skills', skills_hub_btn: 'Skill Hub', skills_loading: 'Loading skills...', skills_loading_desc: 'Skills will be displayed here after loading', tools_section_title: 'Built-in Tools', tools_loading: 'Loading tools...', skills_section_title: 'Skills', skill_enable: 'Enable', skill_disable: 'Disable', diff --git a/config.py b/config.py index 76e19c50..bea33752 100644 --- a/config.py +++ b/config.py @@ -246,15 +246,9 @@ class Config(dict): self.user_datas = {} def __getitem__(self, key): - # 跳过以下划线开头的注释字段 - if not key.startswith("_") and key not in available_setting: - logger.debug("[Config] key '{}' not in available_setting, may not take effect".format(key)) return super().__getitem__(key) def __setitem__(self, key, value): - # 跳过以下划线开头的注释字段 - if not key.startswith("_") and key not in available_setting: - logger.debug("[Config] key '{}' not in available_setting, may not take effect".format(key)) return super().__setitem__(key, value) def get(self, key, default=None): @@ -262,7 +256,7 @@ class Config(dict): if key.startswith("_"): return super().get(key, default) - # 如果key不在available_setting中,直接返回default + # 如果key不在available_setting中,直接走dict的get,返回config.json中实际加载的值(如不存在则返回default) if key not in available_setting: return super().get(key, default)