mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-06-02 00:57:41 +08:00
fix(mcp): correct hot-reload sync on default Agent
This commit is contained in:
@@ -594,6 +594,15 @@ class AgentStreamExecutor:
|
|||||||
turns = self._identify_complete_turns()
|
turns = self._identify_complete_turns()
|
||||||
logger.info(f"Sending {len(messages)} messages ({len(turns)} turns) to LLM")
|
logger.info(f"Sending {len(messages)} messages ({len(turns)} turns) to LLM")
|
||||||
|
|
||||||
|
# Pull in any MCP tools that finished loading since this turn started.
|
||||||
|
# Cheap dict reconciliation (microseconds) — lets the agent pick up
|
||||||
|
# newly available MCP tools mid-conversation without a session restart.
|
||||||
|
try:
|
||||||
|
from agent.tools import ToolManager
|
||||||
|
ToolManager().sync_mcp_into_agent(self)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"[Agent] MCP sync skipped: {e}")
|
||||||
|
|
||||||
# Prepare tool definitions (OpenAI/Claude format)
|
# Prepare tool definitions (OpenAI/Claude format)
|
||||||
tools_schema = None
|
tools_schema = None
|
||||||
if self.tools:
|
if self.tools:
|
||||||
|
|||||||
@@ -64,6 +64,13 @@ class ToolManager:
|
|||||||
# server_name -> "pending" / "ready" / "failed"
|
# server_name -> "pending" / "ready" / "failed"
|
||||||
# Useful for UI / introspection while async loading is in progress.
|
# Useful for UI / introspection while async loading is in progress.
|
||||||
self._mcp_status: dict = {}
|
self._mcp_status: dict = {}
|
||||||
|
if not hasattr(self, '_mcp_signature'):
|
||||||
|
# (mtime, sha256) of mcp.json the last time we loaded.
|
||||||
|
# Used by refresh_mcp_if_changed() to skip re-parsing when nothing changed.
|
||||||
|
self._mcp_signature: tuple = (None, None)
|
||||||
|
if not hasattr(self, '_mcp_active_configs'):
|
||||||
|
# server_name -> normalized config dict, for diff-based reload.
|
||||||
|
self._mcp_active_configs: dict = {}
|
||||||
|
|
||||||
def load_tools(self, tools_dir: str = "", config_dict=None):
|
def load_tools(self, tools_dir: str = "", config_dict=None):
|
||||||
"""
|
"""
|
||||||
@@ -257,6 +264,31 @@ class ToolManager:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error configuring tools from config: {e}")
|
logger.error(f"Error configuring tools from config: {e}")
|
||||||
|
|
||||||
|
def _mcp_json_path(self) -> str:
|
||||||
|
import os
|
||||||
|
workspace = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
|
||||||
|
return os.path.join(workspace, "mcp.json")
|
||||||
|
|
||||||
|
def _read_mcp_json_signature(self):
|
||||||
|
"""
|
||||||
|
Return (mtime, sha256_of_bytes) for ~/cow/mcp.json without parsing.
|
||||||
|
Returns (None, None) if the file doesn't exist or is unreadable.
|
||||||
|
Cheap enough (one stat + one small read) to call on every agent init.
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import hashlib
|
||||||
|
path = self._mcp_json_path()
|
||||||
|
try:
|
||||||
|
mtime = os.path.getmtime(path)
|
||||||
|
except OSError:
|
||||||
|
return (None, None)
|
||||||
|
try:
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
digest = hashlib.sha256(f.read()).hexdigest()
|
||||||
|
except OSError:
|
||||||
|
return (mtime, None)
|
||||||
|
return (mtime, digest)
|
||||||
|
|
||||||
def _load_mcp_configs(self) -> list:
|
def _load_mcp_configs(self) -> list:
|
||||||
"""
|
"""
|
||||||
Load MCP server configs with priority:
|
Load MCP server configs with priority:
|
||||||
@@ -266,8 +298,7 @@ class ToolManager:
|
|||||||
import os
|
import os
|
||||||
import json as _json
|
import json as _json
|
||||||
|
|
||||||
workspace = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
|
mcp_json_path = self._mcp_json_path()
|
||||||
mcp_json_path = os.path.join(workspace, "mcp.json")
|
|
||||||
|
|
||||||
if os.path.exists(mcp_json_path):
|
if os.path.exists(mcp_json_path):
|
||||||
try:
|
try:
|
||||||
@@ -298,6 +329,12 @@ class ToolManager:
|
|||||||
if self._mcp_loaded:
|
if self._mcp_loaded:
|
||||||
return
|
return
|
||||||
mcp_servers_config = self._load_mcp_configs()
|
mcp_servers_config = self._load_mcp_configs()
|
||||||
|
# Snapshot the signature now so future refresh_mcp_if_changed()
|
||||||
|
# calls can short-circuit when nothing has changed on disk.
|
||||||
|
self._mcp_signature = self._read_mcp_json_signature()
|
||||||
|
self._mcp_active_configs = {
|
||||||
|
cfg.get("name", "<unnamed>"): cfg for cfg in mcp_servers_config
|
||||||
|
}
|
||||||
if not mcp_servers_config:
|
if not mcp_servers_config:
|
||||||
# Mark as loaded even when there is nothing to load,
|
# Mark as loaded even when there is nothing to load,
|
||||||
# so we don't re-read the config file on every call.
|
# so we don't re-read the config file on every call.
|
||||||
@@ -322,6 +359,88 @@ class ToolManager:
|
|||||||
f"({len(mcp_servers_config)} server(s) configured)"
|
f"({len(mcp_servers_config)} server(s) configured)"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def refresh_mcp_if_changed(self):
|
||||||
|
"""
|
||||||
|
Cheap check whether ~/cow/mcp.json has changed since last load.
|
||||||
|
If it has, do a diff-based reload: start newly added servers,
|
||||||
|
shut down removed ones, and restart any whose config was edited.
|
||||||
|
Untouched servers are left running.
|
||||||
|
|
||||||
|
Designed to be called on every agent creation. The fast path is
|
||||||
|
a single os.stat() — completely free when nothing has changed.
|
||||||
|
"""
|
||||||
|
with self._mcp_lock:
|
||||||
|
new_sig = self._read_mcp_json_signature()
|
||||||
|
if new_sig == self._mcp_signature:
|
||||||
|
return # no-op fast path
|
||||||
|
|
||||||
|
try:
|
||||||
|
new_configs = self._load_mcp_configs()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[ToolManager] MCP reload — failed to parse config: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
new_by_name = {
|
||||||
|
cfg.get("name", "<unnamed>"): cfg for cfg in new_configs
|
||||||
|
}
|
||||||
|
old_by_name = self._mcp_active_configs
|
||||||
|
|
||||||
|
added = [n for n in new_by_name if n not in old_by_name]
|
||||||
|
removed = [n for n in old_by_name if n not in new_by_name]
|
||||||
|
changed = [
|
||||||
|
n for n in new_by_name
|
||||||
|
if n in old_by_name and new_by_name[n] != old_by_name[n]
|
||||||
|
]
|
||||||
|
|
||||||
|
if not (added or removed or changed):
|
||||||
|
# Signature drifted but content is logically identical
|
||||||
|
# (e.g. user re-saved the file without edits). Just sync.
|
||||||
|
self._mcp_signature = new_sig
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"[ToolManager] mcp.json changed — "
|
||||||
|
f"adding={added}, removing={removed}, restarting={changed}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Tear down removed + changed servers (changed ones get restarted below)
|
||||||
|
for name in removed + changed:
|
||||||
|
self._teardown_mcp_server(name)
|
||||||
|
|
||||||
|
# Spin up newly added + changed servers in the background
|
||||||
|
to_start = [new_by_name[n] for n in added + changed]
|
||||||
|
if to_start:
|
||||||
|
for cfg in to_start:
|
||||||
|
self._mcp_status[cfg.get("name", "<unnamed>")] = "pending"
|
||||||
|
threading.Thread(
|
||||||
|
target=self._load_mcp_tools_async,
|
||||||
|
args=(to_start,),
|
||||||
|
daemon=True,
|
||||||
|
name="mcp-loader-reload",
|
||||||
|
).start()
|
||||||
|
|
||||||
|
self._mcp_active_configs = new_by_name
|
||||||
|
self._mcp_signature = new_sig
|
||||||
|
|
||||||
|
def _teardown_mcp_server(self, server_name: str):
|
||||||
|
"""Shut down one MCP server and drop its tools from the registry."""
|
||||||
|
if self._mcp_registry is None:
|
||||||
|
return
|
||||||
|
client = None
|
||||||
|
with self._mcp_registry._registry_lock:
|
||||||
|
client = self._mcp_registry._clients.pop(server_name, None)
|
||||||
|
if client is not None:
|
||||||
|
try:
|
||||||
|
client.shutdown()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[MCP] Error shutting down '{server_name}': {e}")
|
||||||
|
# Drop tools that belonged to this server.
|
||||||
|
for tool_name in list(self._mcp_tool_instances.keys()):
|
||||||
|
tool = self._mcp_tool_instances.get(tool_name)
|
||||||
|
if tool is not None and getattr(tool, "server_name", None) == server_name:
|
||||||
|
self._mcp_tool_instances.pop(tool_name, None)
|
||||||
|
self._mcp_status.pop(server_name, None)
|
||||||
|
|
||||||
def _load_mcp_tools_async(self, mcp_servers_config):
|
def _load_mcp_tools_async(self, mcp_servers_config):
|
||||||
"""
|
"""
|
||||||
Background worker: bring up each MCP server one-by-one and
|
Background worker: bring up each MCP server one-by-one and
|
||||||
@@ -374,7 +493,7 @@ class ToolManager:
|
|||||||
logger.warning(f"[MCP] Server '{server_name}' load failed: {e}")
|
logger.warning(f"[MCP] Server '{server_name}' load failed: {e}")
|
||||||
|
|
||||||
ready = sum(1 for s in self._mcp_status.values() if s == "ready")
|
ready = sum(1 for s in self._mcp_status.values() if s == "ready")
|
||||||
total = len(mcp_servers_config)
|
total = len(self._mcp_status)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[ToolManager] MCP loading complete: "
|
f"[ToolManager] MCP loading complete: "
|
||||||
f"{ready}/{total} server(s) ready, "
|
f"{ready}/{total} server(s) ready, "
|
||||||
@@ -387,6 +506,64 @@ class ToolManager:
|
|||||||
"""Return {server_name: status} snapshot for UI / debugging."""
|
"""Return {server_name: status} snapshot for UI / debugging."""
|
||||||
return dict(self._mcp_status)
|
return dict(self._mcp_status)
|
||||||
|
|
||||||
|
def sync_mcp_into_agent(self, agent) -> tuple:
|
||||||
|
"""
|
||||||
|
Reconcile a live agent's tool collection with the current MCP tool registry.
|
||||||
|
|
||||||
|
Adds tools that finished loading after the agent was created,
|
||||||
|
and removes tools whose MCP server was torn down. Built-in tools
|
||||||
|
on the agent are left untouched.
|
||||||
|
|
||||||
|
Handles both representations CowAgent uses:
|
||||||
|
- Agent.tools: list[BaseTool] (default Agent class)
|
||||||
|
- AgentStream.tools: dict[str, BaseTool] (streaming agent)
|
||||||
|
|
||||||
|
Returns (added_names, removed_names) for logging.
|
||||||
|
"""
|
||||||
|
if agent is None or not hasattr(agent, "tools"):
|
||||||
|
return ([], [])
|
||||||
|
|
||||||
|
from agent.tools.mcp.mcp_tool import McpTool
|
||||||
|
current = self._mcp_tool_instances
|
||||||
|
registry_names = set(current.keys())
|
||||||
|
|
||||||
|
agent_tools = agent.tools
|
||||||
|
|
||||||
|
if isinstance(agent_tools, dict):
|
||||||
|
agent_mcp_names = {
|
||||||
|
name for name, tool in agent_tools.items()
|
||||||
|
if isinstance(tool, McpTool)
|
||||||
|
}
|
||||||
|
added = registry_names - agent_mcp_names
|
||||||
|
removed = agent_mcp_names - registry_names
|
||||||
|
if not (added or removed):
|
||||||
|
return ([], [])
|
||||||
|
for name in added:
|
||||||
|
agent_tools[name] = current[name]
|
||||||
|
for name in removed:
|
||||||
|
agent_tools.pop(name, None)
|
||||||
|
|
||||||
|
elif isinstance(agent_tools, list):
|
||||||
|
agent_mcp_names = {
|
||||||
|
t.name for t in agent_tools if isinstance(t, McpTool)
|
||||||
|
}
|
||||||
|
added = registry_names - agent_mcp_names
|
||||||
|
removed = agent_mcp_names - registry_names
|
||||||
|
if not (added or removed):
|
||||||
|
return ([], [])
|
||||||
|
if removed:
|
||||||
|
agent.tools = [
|
||||||
|
t for t in agent_tools
|
||||||
|
if not (isinstance(t, McpTool) and t.name in removed)
|
||||||
|
]
|
||||||
|
for name in added:
|
||||||
|
agent.tools.append(current[name])
|
||||||
|
|
||||||
|
else:
|
||||||
|
return ([], [])
|
||||||
|
|
||||||
|
return (sorted(added), sorted(removed))
|
||||||
|
|
||||||
def create_tool(self, name: str) -> BaseTool:
|
def create_tool(self, name: str) -> BaseTool:
|
||||||
"""
|
"""
|
||||||
Get a new instance of a tool by name.
|
Get a new instance of a tool by name.
|
||||||
|
|||||||
@@ -462,6 +462,12 @@ class AgentBridge:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"[AgentBridge] Failed to clear DB after recovery: {e}")
|
logger.warning(f"[AgentBridge] Failed to clear DB after recovery: {e}")
|
||||||
|
|
||||||
|
# Post-message hot-reload: detect edits to ~/cow/mcp.json and
|
||||||
|
# sync any new/removed MCP tools into the live agent in the
|
||||||
|
# background. Off the critical path so user latency is unaffected;
|
||||||
|
# changes take effect on the user's next message.
|
||||||
|
self._schedule_mcp_hot_reload(agent)
|
||||||
|
|
||||||
# Check if there are files to send (from send/read tool)
|
# Check if there are files to send (from send/read tool)
|
||||||
if hasattr(agent, 'stream_executor') and hasattr(agent.stream_executor, 'files_to_send'):
|
if hasattr(agent, 'stream_executor') and hasattr(agent.stream_executor, 'files_to_send'):
|
||||||
files_to_send = agent.stream_executor.files_to_send
|
files_to_send = agent.stream_executor.files_to_send
|
||||||
@@ -494,6 +500,31 @@ class AgentBridge:
|
|||||||
logger.warning(f"[AgentBridge] Failed to clear DB after error: {db_err}")
|
logger.warning(f"[AgentBridge] Failed to clear DB after error: {db_err}")
|
||||||
return Reply(ReplyType.ERROR, f"Agent error: {str(e)}")
|
return Reply(ReplyType.ERROR, f"Agent error: {str(e)}")
|
||||||
|
|
||||||
|
def _schedule_mcp_hot_reload(self, agent):
|
||||||
|
"""
|
||||||
|
Fire-and-forget: detect mcp.json edits and reconcile the agent's
|
||||||
|
tool dict in the background. Runs after the user's reply is sent,
|
||||||
|
so any cost (file stat, hash, server boot) never adds to user latency.
|
||||||
|
Failures are isolated and never raise into the message pipeline.
|
||||||
|
"""
|
||||||
|
import threading
|
||||||
|
from agent.tools import ToolManager
|
||||||
|
|
||||||
|
def _run():
|
||||||
|
try:
|
||||||
|
tm = ToolManager()
|
||||||
|
tm.refresh_mcp_if_changed()
|
||||||
|
added, removed = tm.sync_mcp_into_agent(agent)
|
||||||
|
if added or removed:
|
||||||
|
logger.info(
|
||||||
|
f"[AgentBridge] Agent tools synced — "
|
||||||
|
f"added={added}, removed={removed}"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[AgentBridge] MCP hot-reload failed (non-fatal): {e}")
|
||||||
|
|
||||||
|
threading.Thread(target=_run, daemon=True, name="mcp-hot-reload").start()
|
||||||
|
|
||||||
def _create_file_reply(self, file_info: dict, text_response: str, context: Context = None) -> Reply:
|
def _create_file_reply(self, file_info: dict, text_response: str, context: Context = None) -> Reply:
|
||||||
"""
|
"""
|
||||||
Create a reply for sending files
|
Create a reply for sending files
|
||||||
|
|||||||
Reference in New Issue
Block a user