From b861eef26f6cace05bf736cee0e80ca9c7fe0ce7 Mon Sep 17 00:00:00 2001 From: ooaaooaa123 Date: Fri, 8 May 2026 09:58:40 +0800 Subject: [PATCH] fix(mcp): address PR review feedback on stability and config Stability fixes in mcp_client.py: - Fix stderr buffer overflow: start daemon thread to continuously drain stderr pipe, preventing 64KB buffer fill that blocks child process - Fix notification interference: loop readline and skip JSON-RPC messages without 'id' field (notifications) instead of treating them as responses - Fix concurrent race condition: wrap send+receive in _call_lock so multiple sessions cannot interleave reads/writes on the same client - Fix missing timeout: use select.select() with 30s timeout in _readline_with_timeout() to prevent infinite block on dead MCP server Config improvements in tool_manager.py: - Add _normalize_mcp_configs() to support both list format (mcp_servers) and dict format (mcpServers used by Claude Desktop / Cursor) - Add _load_mcp_configs() to load from ~/cow/mcp.json first, falling back to config.json mcp_servers field for backward compatibility Co-Authored-By: Claude Sonnet 4.6 --- agent/tools/mcp/mcp_client.py | 44 ++++++++++++++++++++++++++------ agent/tools/tool_manager.py | 47 ++++++++++++++++++++++++++++++++++- 2 files changed, 82 insertions(+), 9 deletions(-) diff --git a/agent/tools/mcp/mcp_client.py b/agent/tools/mcp/mcp_client.py index 281d3616..694a0c46 100644 --- a/agent/tools/mcp/mcp_client.py +++ b/agent/tools/mcp/mcp_client.py @@ -7,6 +7,7 @@ MCP SDK dependency. import json import os +import select import subprocess import threading import urllib.request @@ -39,6 +40,7 @@ class McpClient: # Shared state self._next_id = 1 self._id_lock = threading.Lock() + self._call_lock = threading.Lock() self._initialized = False # ------------------------------------------------------------------ @@ -133,8 +135,26 @@ class McpClient: env=env, ) logger.debug(f"[MCP:{self.name}] stdio process started (pid={self._proc.pid})") + + threading.Thread( + target=self._drain_stderr, daemon=True, name=f"mcp-stderr-{self.name}" + ).start() + return self._handshake() + def _drain_stderr(self): + for line in self._proc.stderr: + line = line.strip() + if line: + logger.debug(f"[MCP:{self.name}] stderr: {line}") + + def _readline_with_timeout(self, timeout: int = 30) -> str: + """Read one line from stdio stdout with a hard timeout.""" + ready, _, _ = select.select([self._proc.stdout], [], [], timeout) + if not ready: + raise TimeoutError(f"[MCP:{self.name}] stdio read timed out after {timeout}s") + return self._proc.stdout.readline() + def _stdio_send(self, message: dict) -> dict: """Send a JSON-RPC message over stdio and read the response.""" raw = json.dumps(message) + "\n" @@ -142,13 +162,20 @@ class McpClient: self._proc.stdin.flush() while True: - line = self._proc.stdout.readline() + line = self._readline_with_timeout() if not line: raise IOError(f"[MCP:{self.name}] stdio process closed unexpectedly") line = line.strip() if not line: continue - return json.loads(line) + try: + data = json.loads(line) + except json.JSONDecodeError: + continue + if "id" not in data: + logger.debug(f"[MCP:{self.name}] notification skipped: {data.get('method', '?')}") + continue + return data # ------------------------------------------------------------------ # SSE transport @@ -235,12 +262,13 @@ class McpClient: message = self._build_request(method, params) - if self.transport == "stdio": - return self._stdio_send(message) - elif self.transport == "sse": - return self._sse_send(message) - else: - raise ValueError(f"[MCP:{self.name}] Unsupported transport: {self.transport}") + with self._call_lock: + if self.transport == "stdio": + return self._stdio_send(message) + elif self.transport == "sse": + return self._sse_send(message) + else: + raise ValueError(f"[MCP:{self.name}] Unsupported transport: {self.transport}") def _send_notification(self, method: str, params: dict): """Fire-and-forget notification (no response expected).""" diff --git a/agent/tools/tool_manager.py b/agent/tools/tool_manager.py index 4a40474e..f0b3cc09 100644 --- a/agent/tools/tool_manager.py +++ b/agent/tools/tool_manager.py @@ -7,6 +7,26 @@ from common.log import logger from config import conf +def _normalize_mcp_configs(raw) -> list: + """ + Convert MCP server config to internal list format. + Supports: + - list format (mcp_servers): [{"name": "x", "type": "stdio", ...}] + - dict format (mcpServers): {"x": {"command": "npx", ...}} + """ + if isinstance(raw, list): + return raw + if isinstance(raw, dict): + result = [] + for name, cfg in raw.items(): + entry = {"name": name, **cfg} + if "type" not in entry: + entry["type"] = "sse" if "url" in entry else "stdio" + result.append(entry) + return result + return [] + + class ToolManager: """ Tool manager for managing tools. @@ -222,10 +242,35 @@ class ToolManager: except Exception as e: logger.error(f"Error configuring tools from config: {e}") + def _load_mcp_configs(self) -> list: + """ + Load MCP server configs with priority: + 1. ~/cow/mcp.json (supports both mcpServers and mcp_servers keys) + 2. config.json mcp_servers field (fallback) + """ + import os + import json as _json + + workspace = os.path.expanduser(conf().get("agent_workspace", "~/cow")) + mcp_json_path = os.path.join(workspace, "mcp.json") + + if os.path.exists(mcp_json_path): + try: + with open(mcp_json_path, "r", encoding="utf-8") as f: + data = _json.load(f) + raw = data.get("mcpServers") or data.get("mcp_servers") or data + logger.info(f"[ToolManager] Loading MCP config from {mcp_json_path}") + return _normalize_mcp_configs(raw) + except Exception as e: + logger.warning(f"[ToolManager] Failed to read {mcp_json_path}: {e}, falling back to config.json") + + raw = conf().get("mcp_servers", []) + return _normalize_mcp_configs(raw) + def _load_mcp_tools(self): """Load MCP tools from mcp_servers config. Failures are non-fatal.""" try: - mcp_servers_config = conf().get("mcp_servers", []) + mcp_servers_config = self._load_mcp_configs() if not mcp_servers_config: return