From ad2db1a776d5efcfe584752330760a4bd039b35e Mon Sep 17 00:00:00 2001 From: zhayujie Date: Tue, 26 May 2026 12:10:03 +0800 Subject: [PATCH] feat(mcp): support streamable-http mcp protocol --- README.md | 4 +- agent/tools/mcp/mcp_client.py | 166 ++++++++++++++++++++++++++++++++-- docs/en/tools/mcp.mdx | 9 +- docs/ja/README.md | 4 +- docs/ja/tools/mcp.mdx | 9 +- docs/tools/mcp.mdx | 9 +- docs/zh/README.md | 4 +- 7 files changed, 184 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 513ea959..2a72f513 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ CowAgent supports all mainstream LLM providers. **Chat, vision, image generation | [Kimi](https://docs.cowagent.ai/en/models/kimi) | kimi-k2.6 | ✅ | ✅ | | | | | | [MiniMax](https://docs.cowagent.ai/en/models/minimax) | MiniMax-M2.7 | ✅ | ✅ | ✅ | | ✅ | | | [ERNIE](https://docs.cowagent.ai/en/models/qianfan) | ernie-5.1 | ✅ | ✅ | | | | | -| [LinkAI](https://docs.cowagent.ai/en/models/linkai) | 100+ models, unified gateway | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | +| [LinkAI](https://docs.cowagent.ai/en/models/linkai) | One key for 100+ models | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | | [Custom](https://docs.cowagent.ai/en/models/custom) | Local models / third-party proxy | ✅ | | | | | | > For details on each provider, see the [Models overview](https://docs.cowagent.ai/en/models/index). @@ -222,7 +222,7 @@ Full history: [Release Notes](https://docs.cowagent.ai/en/releases/overview) [**LinkAI**](https://link-ai.tech/) is an all-in-one AI Agent platform for enterprises and developers, offering managed hosting and enterprise-grade support for CowAgent: - **🚀 Zero-deployment hosted runtime** — spin up a [CowAgent online assistant](https://link-ai.tech/cowagent/create) in under a minute, no server required -- **🧠 Aggregated models & skill marketplace** — unified access to mainstream LLMs and an official skill marketplace, extending CowAgent's reach +- **🧠 Agent infrastructure** — unified access to LLMs, knowledge bases, databases, skills, and workflows; plug-and-play building blocks that extend what CowAgent can do - **🏢 Team & enterprise features** — workspaces, role-based access, audit logs, and private deployment for production use cases For enterprise inquiries: sales@simple-future.tech or [scan the QR code](https://cdn.link-ai.tech/consultant.jpg) to reach our team on WeChat. diff --git a/agent/tools/mcp/mcp_client.py b/agent/tools/mcp/mcp_client.py index 694a0c46..be93c716 100644 --- a/agent/tools/mcp/mcp_client.py +++ b/agent/tools/mcp/mcp_client.py @@ -1,8 +1,8 @@ """ MCP (Model Context Protocol) client module. -Implements JSON-RPC 2.0 over stdio and SSE transports without any external -MCP SDK dependency. +Implements JSON-RPC 2.0 over stdio, SSE and Streamable HTTP transports +without any external MCP SDK dependency. """ import json @@ -17,18 +17,29 @@ from typing import Optional from common.log import logger +# Aliases accepted for the Streamable HTTP transport type +_STREAMABLE_HTTP_ALIASES = {"streamable-http", "streamable_http", "streamablehttp", "http"} + + class McpClient: - """Single MCP Server client supporting stdio and SSE transports.""" + """Single MCP Server client supporting stdio, SSE and Streamable HTTP transports.""" def __init__(self, config: dict): """ config examples: - stdio: {"name": "filesystem", "type": "stdio", "command": "npx", "args": [...]} - SSE: {"name": "my-api", "type": "sse", "url": "http://localhost:8000/sse"} + stdio: {"name": "filesystem", "type": "stdio", "command": "npx", "args": [...]} + SSE: {"name": "my-api", "type": "sse", "url": "http://localhost:8000/sse"} + streamable-http: {"name": "pubmed", "type": "streamable-http", "url": "https://x/mcp"} """ self.config = config self.name: str = config.get("name", "unknown") - self.transport: str = config.get("type", "stdio") + raw_transport: str = config.get("type", "stdio") + # Normalize streamable-http aliases to a single internal key + self.transport: str = ( + "streamable-http" + if raw_transport.lower() in _STREAMABLE_HTTP_ALIASES + else raw_transport + ) # stdio state self._proc: Optional[subprocess.Popen] = None @@ -37,6 +48,11 @@ class McpClient: self._sse_url: Optional[str] = None self._post_url: Optional[str] = None # endpoint for sending messages (resolved from SSE) + # Streamable HTTP state + self._http_url: Optional[str] = None + self._http_headers: dict = {} # extra headers from user config (e.g. Authorization) + self._http_session_id: Optional[str] = None # Mcp-Session-Id assigned by the server + # Shared state self._next_id = 1 self._id_lock = threading.Lock() @@ -54,6 +70,8 @@ class McpClient: return self._init_stdio() elif self.transport == "sse": return self._init_sse() + elif self.transport == "streamable-http": + return self._init_streamable_http() else: logger.warning(f"[MCP:{self.name}] Unknown transport type: {self.transport!r}") return False @@ -109,6 +127,21 @@ class McpClient: pass self._proc = None logger.debug(f"[MCP:{self.name}] stdio process terminated") + + # Best-effort streamable-http session termination + if self.transport == "streamable-http" and self._http_session_id and self._http_url: + try: + req = urllib.request.Request( + self._http_url, + method="DELETE", + headers={"Mcp-Session-Id": self._http_session_id, **self._http_headers}, + ) + with urllib.request.urlopen(req, timeout=5): + pass + except Exception: + pass + self._http_session_id = None + self._initialized = False # ------------------------------------------------------------------ @@ -234,6 +267,120 @@ class McpClient: raw = resp.read().decode("utf-8") return json.loads(raw) + # ------------------------------------------------------------------ + # Streamable HTTP transport (MCP spec 2025-03-26) + # ------------------------------------------------------------------ + + def _init_streamable_http(self) -> bool: + url = self.config.get("url") + if not url: + logger.warning(f"[MCP:{self.name}] streamable-http config missing 'url'") + return False + + self._http_url = url + # Allow user-provided headers (e.g. {"Authorization": "Bearer xxx"}) + extra_headers = self.config.get("headers") or {} + if isinstance(extra_headers, dict): + self._http_headers = {str(k): str(v) for k, v in extra_headers.items()} + + return self._handshake() + + def _streamable_http_send(self, message: dict) -> dict: + """POST a JSON-RPC request and return the response (JSON or SSE-wrapped).""" + return self._streamable_http_post(message, expect_response=True) + + def _streamable_http_post(self, message: dict, expect_response: bool) -> dict: + """ + POST a JSON-RPC message over Streamable HTTP. + + Per the spec, the response Content-Type can be either: + - application/json -> single JSON-RPC response in body + - text/event-stream -> SSE stream; we read until we get a matching response + """ + body = json.dumps(message).encode("utf-8") + headers = { + "Content-Type": "application/json", + "Accept": "application/json, text/event-stream", + } + if self._http_session_id: + headers["Mcp-Session-Id"] = self._http_session_id + headers.update(self._http_headers) + + req = urllib.request.Request( + self._http_url, + data=body, + method="POST", + headers=headers, + ) + + try: + resp = urllib.request.urlopen(req, timeout=30) + except urllib.error.HTTPError as e: + # Surface the server-provided error body for easier debugging + detail = "" + try: + detail = e.read().decode("utf-8", errors="ignore") + except Exception: + pass + raise IOError( + f"[MCP:{self.name}] streamable-http HTTP {e.code}: {detail[:200]}" + ) + + with resp: + # Capture session id assigned by the server (if any) + session_id = resp.headers.get("Mcp-Session-Id") + if session_id and not self._http_session_id: + self._http_session_id = session_id + + status = resp.status if hasattr(resp, "status") else resp.getcode() + + # Notifications: server may reply with 202 Accepted and no body + if not expect_response or status == 202: + try: + resp.read() + except Exception: + pass + return {} + + content_type = (resp.headers.get("Content-Type") or "").lower() + expected_id = message.get("id") + + if "text/event-stream" in content_type: + return self._read_sse_response(resp, expected_id) + + raw = resp.read().decode("utf-8") + if not raw: + return {} + return json.loads(raw) + + def _read_sse_response(self, resp, expected_id) -> dict: + """Read an SSE stream and return the first JSON-RPC response with matching id.""" + data_buf: list = [] + for raw_line in resp: + line = raw_line.decode("utf-8").rstrip("\n\r") + if line == "": + # End of an SSE event, attempt to parse accumulated data + if data_buf: + payload = "\n".join(data_buf) + data_buf = [] + try: + msg = json.loads(payload) + except json.JSONDecodeError: + continue + # Skip notifications / mismatched ids + if "id" not in msg: + continue + if expected_id is None or msg.get("id") == expected_id: + return msg + continue + if line.startswith(":"): + continue # SSE comment / keepalive + if line.startswith("data:"): + data_buf.append(line[len("data:"):].lstrip()) + # Ignore 'event:' / 'id:' lines; we only care about JSON-RPC payloads + + raise IOError(f"[MCP:{self.name}] streamable-http SSE stream closed before response") + # ------------------------------------------------------------------ # Common JSON-RPC helpers # ------------------------------------------------------------------ @@ -267,6 +414,8 @@ class McpClient: return self._stdio_send(message) elif self.transport == "sse": return self._sse_send(message) + elif self.transport == "streamable-http": + return self._streamable_http_send(message) else: raise ValueError(f"[MCP:{self.name}] Unsupported transport: {self.transport}") @@ -291,6 +440,11 @@ class McpClient: pass except Exception: pass # notifications are fire-and-forget + elif self.transport == "streamable-http": + try: + self._streamable_http_post(notification, expect_response=False) + except Exception: + pass # notifications are fire-and-forget def _handshake(self) -> bool: """Perform the MCP initialize / notifications/initialized handshake.""" diff --git a/docs/en/tools/mcp.mdx b/docs/en/tools/mcp.mdx index 9978c46e..fc320fe0 100644 --- a/docs/en/tools/mcp.mdx +++ b/docs/en/tools/mcp.mdx @@ -34,7 +34,9 @@ Fully compatible with the MCP community standard, identical to Claude Desktop / | `command` | stdio | Executable to launch the server (e.g. `npx`, `python`, `uvx`) | | `args` | No | Arguments passed to `command` | | `env` | No | Environment variables for the subprocess, commonly used for API keys | -| `url` | SSE | SSE endpoint URL (alternative to `command`) | +| `url` | SSE / Streamable HTTP | Remote endpoint URL (alternative to `command`) | +| `type` | Remote | Remote transport type: `sse` or `streamable-http` (defaults to `sse`) | +| `headers` | No | Extra HTTP headers for remote requests (e.g. `Authorization`); Streamable HTTP only | | `disabled` | No | When `true`, this server is skipped — handy for temporary disabling | ### Full Example @@ -88,7 +90,8 @@ The Agent will: | Transport | Description | Config Field | | --- | --- | --- | | **stdio** | Subprocess communication. The most common option, with the richest community ecosystem. | `command` + `args` | -| **SSE** | HTTP Server-Sent Events, suitable for remotely hosted MCP services. | `url` | +| **SSE** | HTTP Server-Sent Events. Legacy remote transport. | `url` (default) | +| **Streamable HTTP** | New unified remote transport, gradually replacing SSE. | `type: "streamable-http"` + `url` | ## Troubleshooting @@ -106,4 +109,4 @@ You can browse third-party MCP marketplaces and copy a JSON config to use direct - [mcp.so](https://mcp.so) — Global MCP service index - [ModelScope MCP Hub](https://modelscope.cn/mcp) — ModelScope's MCP hub, more reliable from mainland China -Any MCP server that follows the standard protocol (stdio / SSE) integrates with CowAgent out of the box. +Any MCP server that follows the standard protocol (stdio / SSE / Streamable HTTP) integrates with CowAgent out of the box. diff --git a/docs/ja/README.md b/docs/ja/README.md index 62dba7a5..b68a82d0 100644 --- a/docs/ja/README.md +++ b/docs/ja/README.md @@ -104,7 +104,7 @@ CowAgent は主要な LLM プロバイダーすべてに対応しています。 | [Kimi](https://docs.cowagent.ai/ja/models/kimi) | kimi-k2.6 | ✅ | ✅ | | | | | | [MiniMax](https://docs.cowagent.ai/ja/models/minimax) | MiniMax-M2.7 | ✅ | ✅ | ✅ | | ✅ | | | [ERNIE](https://docs.cowagent.ai/ja/models/qianfan) | ernie-5.1 | ✅ | ✅ | | | | | -| [LinkAI](https://docs.cowagent.ai/ja/models/linkai) | 100+ モデルを統一ゲートウェイで提供 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | +| [LinkAI](https://docs.cowagent.ai/ja/models/linkai) | 1 つの Key で 100+ モデルに接続 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | | [カスタム](https://docs.cowagent.ai/ja/models/custom) | ローカルモデル / サードパーティプロキシ | ✅ | | | | | | > Web コンソールでの設定が推奨されており、ファイルを手動編集する必要はありません。手動設定については各プロバイダーのドキュメントおよび [モデル概要](https://docs.cowagent.ai/ja/models/index) を参照してください。 @@ -222,7 +222,7 @@ GitHub で [Issue を報告](https://github.com/zhayujie/CowAgent/issues) する [**LinkAI**](https://link-ai.tech/) は企業や開発者向けのワンストップ AI Agent プラットフォームで、CowAgent にマネージドホスティングとエンタープライズグレードのサポートを提供します: - **🚀 デプロイ不要のホスト型ランタイム** — [CowAgent オンラインアシスタント](https://link-ai.tech/cowagent/create) を 1 分以内に起動、サーバー不要 -- **🧠 統合モデル & Skill マーケットプレイス** — 主要 LLM への統一アクセスと公式 Skill マーケットプレイスで CowAgent の活用範囲を拡大 +- **🧠 Agent インフラ** — 主要 LLM・ナレッジベース・データベース・Skill・ワークフローへの統一アクセス。CowAgent の機能を拡張する、すぐに使えるビルディングブロック - **🏢 チーム & エンタープライズ機能** — ワークスペース、ロールベースのアクセス制御、監査ログ、本番運用向けプライベートデプロイ エンタープライズに関するお問い合わせ:**sales@simple-future.tech** または [QR コードをスキャン](https://cdn.link-ai.tech/consultant.jpg) して WeChat でお問い合わせください。 diff --git a/docs/ja/tools/mcp.mdx b/docs/ja/tools/mcp.mdx index e450a099..efd4a514 100644 --- a/docs/ja/tools/mcp.mdx +++ b/docs/ja/tools/mcp.mdx @@ -34,7 +34,9 @@ MCP コミュニティ標準に完全準拠しており、Claude Desktop / Curso | `command` | stdio | サーバーを起動する実行コマンド(`npx`、`python`、`uvx` など) | | `args` | 任意 | `command` に渡す引数 | | `env` | 任意 | サブプロセスの環境変数。API Key などに利用 | -| `url` | SSE | SSE エンドポイントの URL(`command` と二者択一) | +| `url` | SSE / Streamable HTTP | リモートエンドポイントの URL(`command` と二者択一) | +| `type` | リモート | リモートトランスポート種別:`sse` または `streamable-http`(既定は `sse`) | +| `headers` | 任意 | リモートリクエストの追加 HTTP ヘッダ(`Authorization` など)。Streamable HTTP のみ | | `disabled` | 任意 | `true` のとき該当サーバーをスキップ。一時的に無効化したいときに便利 | ### 完全な例 @@ -88,7 +90,8 @@ Agent は次のように動作します: | トランスポート | 説明 | 設定フィールド | | --- | --- | --- | | **stdio** | サブプロセス通信。最も一般的で、コミュニティのエコシステムが最も豊富 | `command` + `args` | -| **SSE** | HTTP Server-Sent Events。リモートホスト型の MCP サービス向け | `url` | +| **SSE** | HTTP Server-Sent Events。従来のリモート用トランスポート | `url`(既定) | +| **Streamable HTTP** | 新しい単一エンドポイント方式。SSE を段階的に置き換え | `type: "streamable-http"` + `url` | ## トラブルシューティング @@ -106,4 +109,4 @@ Agent は次のように動作します: - [mcp.so](https://mcp.so) — グローバル MCP サービスインデックス - [ModelScope MCP 広場](https://modelscope.cn/mcp) — 魔搭コミュニティの MCP 広場、中国本土からのアクセスが安定 -MCP 標準プロトコル(stdio / SSE)に準拠していれば、コードを一切変更せずに CowAgent に統合できます。 +MCP 標準プロトコル(stdio / SSE / Streamable HTTP)に準拠していれば、コードを一切変更せずに CowAgent に統合できます。 diff --git a/docs/tools/mcp.mdx b/docs/tools/mcp.mdx index 0973f25a..8b7670c1 100644 --- a/docs/tools/mcp.mdx +++ b/docs/tools/mcp.mdx @@ -34,7 +34,9 @@ Docker 部署时,官方 `docker-compose.yml` 已经把宿主机 `./cow` 挂载 | `command` | stdio | 启动 server 的可执行命令(如 `npx`、`python`、`uvx`) | | `args` | 否 | 传给 command 的参数列表 | | `env` | 否 | 子进程的环境变量,常用于 API Key | -| `url` | SSE | SSE 端点 URL(与 `command` 二选一) | +| `url` | SSE / Streamable HTTP | 远程端点 URL(与 `command` 二选一) | +| `type` | 远程 | 远程传输类型,可选 `sse` 或 `streamable-http`,默认 `sse` | +| `headers` | 否 | 远程请求附加 HTTP 头(如 `Authorization`),仅 Streamable HTTP 使用 | | `disabled` | 否 | `true` 时跳过该 server,便于临时关闭 | ### 完整示例 @@ -88,7 +90,8 @@ Agent 会: | 协议 | 说明 | 配置字段 | | --- | --- | --- | | **stdio** | 子进程通信,最常见,社区生态最丰富 | `command` + `args` | -| **SSE** | HTTP Server-Sent Events,适合远程托管的 MCP 服务 | `url` | +| **SSE** | HTTP Server-Sent Events,旧版远程协议 | `url`(默认) | +| **Streamable HTTP** | 新版远程协议,单端点收发,逐步取代 SSE | `type: "streamable-http"` + `url` | ## 排错 @@ -106,4 +109,4 @@ Agent 会: - [mcp.so](https://mcp.so) — 全球 MCP 服务索引 - [ModelScope MCP 广场](https://modelscope.cn/mcp) — 魔搭社区 MCP 广场,国内访问更稳定 -只要遵循 MCP 标准协议(stdio / SSE),都可以直接接入 CowAgent。 +只要遵循 MCP 标准协议(stdio / SSE / Streamable HTTP),都可以直接接入 CowAgent。 diff --git a/docs/zh/README.md b/docs/zh/README.md index 63daf22e..db54626e 100644 --- a/docs/zh/README.md +++ b/docs/zh/README.md @@ -104,7 +104,7 @@ CowAgent 支持国内外主流厂商的大语言模型。**文本对话、图像 | [豆包 Doubao](https://docs.cowagent.ai/models/doubao) | doubao-seed-2.0 系列 | ✅ | ✅ | ✅ | | | ✅ | | [Kimi](https://docs.cowagent.ai/models/kimi) | kimi-k2.6 | ✅ | ✅ | | | | | | [百度ERNIE](https://docs.cowagent.ai/models/qianfan) | ernie-5.1 | ✅ | ✅ | | | | | -| [LinkAI](https://docs.cowagent.ai/models/linkai) | 100+ 模型统一接入 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | +| [LinkAI](https://docs.cowagent.ai/models/linkai) | 一个 Key 接入 100+ 模型 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | | [自定义](https://docs.cowagent.ai/models/custom) | 本地模型 / 三方代理 | ✅ | | | | | | > 推荐通过 Web 控制台在线配置,无需手动编辑文件。手动配置请参考各厂商文档,详见 [模型概览](https://docs.cowagent.ai/models)。 @@ -232,7 +232,7 @@ CowAgent 支持国内外主流厂商的大语言模型。**文本对话、图像 > [LinkAI](https://link-ai.tech/) 是面向企业和个人的一站式 AI 智能体平台,为 CowAgent 提供云端托管和企业级支持: > > - **🚀 免部署在线运行**:无需服务器即可创建 [CowAgent 在线助理](https://link-ai.tech/cowagent/create),1 分钟拥有专属 Agent -> - **🧠 模型与技能支持**:聚合主流大模型与官方技能市场,为 CowAgent 提供更广的模型与技能扩展 +> - **🧠 Agent 基础设施**:聚合主流大模型、知识库、数据库、技能、工作流,提供开箱即用的 Agent 能力扩展 > - **🏢 企业级协作**:提供团队协作、权限分级、审计日志、私有化部署等能力,让 Agent 安全落地企业场景 **产品咨询和企业服务** 可联系产品客服: