From 29e66cb18641d637911738499e42666d667c618a Mon Sep 17 00:00:00 2001 From: zhayujie Date: Fri, 8 May 2026 15:40:29 +0800 Subject: [PATCH] fix(mcp): correct hot-reload sync on default Agent --- agent/protocol/agent_stream.py | 9 ++ agent/tools/tool_manager.py | 183 ++++++++++++++++++++++++++++++++- bridge/agent_bridge.py | 31 ++++++ 3 files changed, 220 insertions(+), 3 deletions(-) diff --git a/agent/protocol/agent_stream.py b/agent/protocol/agent_stream.py index 4e077653..75b4f4ff 100644 --- a/agent/protocol/agent_stream.py +++ b/agent/protocol/agent_stream.py @@ -594,6 +594,15 @@ class AgentStreamExecutor: turns = self._identify_complete_turns() logger.info(f"Sending {len(messages)} messages ({len(turns)} turns) to LLM") + # Pull in any MCP tools that finished loading since this turn started. + # Cheap dict reconciliation (microseconds) — lets the agent pick up + # newly available MCP tools mid-conversation without a session restart. + try: + from agent.tools import ToolManager + ToolManager().sync_mcp_into_agent(self) + except Exception as e: + logger.debug(f"[Agent] MCP sync skipped: {e}") + # Prepare tool definitions (OpenAI/Claude format) tools_schema = None if self.tools: diff --git a/agent/tools/tool_manager.py b/agent/tools/tool_manager.py index f20c5f9e..73b76943 100644 --- a/agent/tools/tool_manager.py +++ b/agent/tools/tool_manager.py @@ -64,6 +64,13 @@ class ToolManager: # server_name -> "pending" / "ready" / "failed" # Useful for UI / introspection while async loading is in progress. self._mcp_status: dict = {} + if not hasattr(self, '_mcp_signature'): + # (mtime, sha256) of mcp.json the last time we loaded. + # Used by refresh_mcp_if_changed() to skip re-parsing when nothing changed. + self._mcp_signature: tuple = (None, None) + if not hasattr(self, '_mcp_active_configs'): + # server_name -> normalized config dict, for diff-based reload. + self._mcp_active_configs: dict = {} def load_tools(self, tools_dir: str = "", config_dict=None): """ @@ -257,6 +264,31 @@ class ToolManager: except Exception as e: logger.error(f"Error configuring tools from config: {e}") + def _mcp_json_path(self) -> str: + import os + workspace = os.path.expanduser(conf().get("agent_workspace", "~/cow")) + return os.path.join(workspace, "mcp.json") + + def _read_mcp_json_signature(self): + """ + Return (mtime, sha256_of_bytes) for ~/cow/mcp.json without parsing. + Returns (None, None) if the file doesn't exist or is unreadable. + Cheap enough (one stat + one small read) to call on every agent init. + """ + import os + import hashlib + path = self._mcp_json_path() + try: + mtime = os.path.getmtime(path) + except OSError: + return (None, None) + try: + with open(path, "rb") as f: + digest = hashlib.sha256(f.read()).hexdigest() + except OSError: + return (mtime, None) + return (mtime, digest) + def _load_mcp_configs(self) -> list: """ Load MCP server configs with priority: @@ -266,8 +298,7 @@ class ToolManager: import os import json as _json - workspace = os.path.expanduser(conf().get("agent_workspace", "~/cow")) - mcp_json_path = os.path.join(workspace, "mcp.json") + mcp_json_path = self._mcp_json_path() if os.path.exists(mcp_json_path): try: @@ -298,6 +329,12 @@ class ToolManager: if self._mcp_loaded: return mcp_servers_config = self._load_mcp_configs() + # Snapshot the signature now so future refresh_mcp_if_changed() + # calls can short-circuit when nothing has changed on disk. + self._mcp_signature = self._read_mcp_json_signature() + self._mcp_active_configs = { + cfg.get("name", ""): cfg for cfg in mcp_servers_config + } if not mcp_servers_config: # Mark as loaded even when there is nothing to load, # so we don't re-read the config file on every call. @@ -322,6 +359,88 @@ class ToolManager: f"({len(mcp_servers_config)} server(s) configured)" ) + def refresh_mcp_if_changed(self): + """ + Cheap check whether ~/cow/mcp.json has changed since last load. + If it has, do a diff-based reload: start newly added servers, + shut down removed ones, and restart any whose config was edited. + Untouched servers are left running. + + Designed to be called on every agent creation. The fast path is + a single os.stat() — completely free when nothing has changed. + """ + with self._mcp_lock: + new_sig = self._read_mcp_json_signature() + if new_sig == self._mcp_signature: + return # no-op fast path + + try: + new_configs = self._load_mcp_configs() + except Exception as e: + logger.warning(f"[ToolManager] MCP reload — failed to parse config: {e}") + return + + new_by_name = { + cfg.get("name", ""): cfg for cfg in new_configs + } + old_by_name = self._mcp_active_configs + + added = [n for n in new_by_name if n not in old_by_name] + removed = [n for n in old_by_name if n not in new_by_name] + changed = [ + n for n in new_by_name + if n in old_by_name and new_by_name[n] != old_by_name[n] + ] + + if not (added or removed or changed): + # Signature drifted but content is logically identical + # (e.g. user re-saved the file without edits). Just sync. + self._mcp_signature = new_sig + return + + logger.info( + f"[ToolManager] mcp.json changed — " + f"adding={added}, removing={removed}, restarting={changed}" + ) + + # Tear down removed + changed servers (changed ones get restarted below) + for name in removed + changed: + self._teardown_mcp_server(name) + + # Spin up newly added + changed servers in the background + to_start = [new_by_name[n] for n in added + changed] + if to_start: + for cfg in to_start: + self._mcp_status[cfg.get("name", "")] = "pending" + threading.Thread( + target=self._load_mcp_tools_async, + args=(to_start,), + daemon=True, + name="mcp-loader-reload", + ).start() + + self._mcp_active_configs = new_by_name + self._mcp_signature = new_sig + + def _teardown_mcp_server(self, server_name: str): + """Shut down one MCP server and drop its tools from the registry.""" + if self._mcp_registry is None: + return + client = None + with self._mcp_registry._registry_lock: + client = self._mcp_registry._clients.pop(server_name, None) + if client is not None: + try: + client.shutdown() + except Exception as e: + logger.warning(f"[MCP] Error shutting down '{server_name}': {e}") + # Drop tools that belonged to this server. + for tool_name in list(self._mcp_tool_instances.keys()): + tool = self._mcp_tool_instances.get(tool_name) + if tool is not None and getattr(tool, "server_name", None) == server_name: + self._mcp_tool_instances.pop(tool_name, None) + self._mcp_status.pop(server_name, None) + def _load_mcp_tools_async(self, mcp_servers_config): """ Background worker: bring up each MCP server one-by-one and @@ -374,7 +493,7 @@ class ToolManager: logger.warning(f"[MCP] Server '{server_name}' load failed: {e}") ready = sum(1 for s in self._mcp_status.values() if s == "ready") - total = len(mcp_servers_config) + total = len(self._mcp_status) logger.info( f"[ToolManager] MCP loading complete: " f"{ready}/{total} server(s) ready, " @@ -387,6 +506,64 @@ class ToolManager: """Return {server_name: status} snapshot for UI / debugging.""" return dict(self._mcp_status) + def sync_mcp_into_agent(self, agent) -> tuple: + """ + Reconcile a live agent's tool collection with the current MCP tool registry. + + Adds tools that finished loading after the agent was created, + and removes tools whose MCP server was torn down. Built-in tools + on the agent are left untouched. + + Handles both representations CowAgent uses: + - Agent.tools: list[BaseTool] (default Agent class) + - AgentStream.tools: dict[str, BaseTool] (streaming agent) + + Returns (added_names, removed_names) for logging. + """ + if agent is None or not hasattr(agent, "tools"): + return ([], []) + + from agent.tools.mcp.mcp_tool import McpTool + current = self._mcp_tool_instances + registry_names = set(current.keys()) + + agent_tools = agent.tools + + if isinstance(agent_tools, dict): + agent_mcp_names = { + name for name, tool in agent_tools.items() + if isinstance(tool, McpTool) + } + added = registry_names - agent_mcp_names + removed = agent_mcp_names - registry_names + if not (added or removed): + return ([], []) + for name in added: + agent_tools[name] = current[name] + for name in removed: + agent_tools.pop(name, None) + + elif isinstance(agent_tools, list): + agent_mcp_names = { + t.name for t in agent_tools if isinstance(t, McpTool) + } + added = registry_names - agent_mcp_names + removed = agent_mcp_names - registry_names + if not (added or removed): + return ([], []) + if removed: + agent.tools = [ + t for t in agent_tools + if not (isinstance(t, McpTool) and t.name in removed) + ] + for name in added: + agent.tools.append(current[name]) + + else: + return ([], []) + + return (sorted(added), sorted(removed)) + def create_tool(self, name: str) -> BaseTool: """ Get a new instance of a tool by name. diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py index 43cfeb53..8f238d62 100644 --- a/bridge/agent_bridge.py +++ b/bridge/agent_bridge.py @@ -462,6 +462,12 @@ class AgentBridge: except Exception as e: logger.warning(f"[AgentBridge] Failed to clear DB after recovery: {e}") + # Post-message hot-reload: detect edits to ~/cow/mcp.json and + # sync any new/removed MCP tools into the live agent in the + # background. Off the critical path so user latency is unaffected; + # changes take effect on the user's next message. + self._schedule_mcp_hot_reload(agent) + # Check if there are files to send (from send/read tool) if hasattr(agent, 'stream_executor') and hasattr(agent.stream_executor, 'files_to_send'): files_to_send = agent.stream_executor.files_to_send @@ -494,6 +500,31 @@ class AgentBridge: logger.warning(f"[AgentBridge] Failed to clear DB after error: {db_err}") return Reply(ReplyType.ERROR, f"Agent error: {str(e)}") + def _schedule_mcp_hot_reload(self, agent): + """ + Fire-and-forget: detect mcp.json edits and reconcile the agent's + tool dict in the background. Runs after the user's reply is sent, + so any cost (file stat, hash, server boot) never adds to user latency. + Failures are isolated and never raise into the message pipeline. + """ + import threading + from agent.tools import ToolManager + + def _run(): + try: + tm = ToolManager() + tm.refresh_mcp_if_changed() + added, removed = tm.sync_mcp_into_agent(agent) + if added or removed: + logger.info( + f"[AgentBridge] Agent tools synced — " + f"added={added}, removed={removed}" + ) + except Exception as e: + logger.warning(f"[AgentBridge] MCP hot-reload failed (non-fatal): {e}") + + threading.Thread(target=_run, daemon=True, name="mcp-hot-reload").start() + def _create_file_reply(self, file_info: dict, text_response: str, context: Context = None) -> Reply: """ Create a reply for sending files