Merge branch 'master' of github.com:zhayujie/chatgpt-on-wechat

This commit is contained in:
zhayujie
2026-05-30 17:18:51 +08:00
15 changed files with 1403 additions and 15 deletions

1
app.py
View File

@@ -231,6 +231,7 @@ def _clear_singleton_cache(channel_name: str):
"wechatmp": "channel.wechatmp.wechatmp_channel.WechatMPChannel",
"wechatmp_service": "channel.wechatmp.wechatmp_channel.WechatMPChannel",
"wechatcom_app": "channel.wechatcom.wechatcomapp_channel.WechatComAppChannel",
const.WECHAT_KF: "channel.wechat_kf.wechat_kf_channel.WechatKfChannel",
const.FEISHU: "channel.feishu.feishu_channel.FeiShuChanel",
const.DINGTALK: "channel.dingtalk.dingtalk_channel.DingTalkChanel",
const.WECOM_BOT: "channel.wecom_bot.wecom_bot_channel.WecomBotChannel",

View File

@@ -27,6 +27,9 @@ def create_channel(channel_type) -> Channel:
elif channel_type == "wechatcom_app":
from channel.wechatcom.wechatcomapp_channel import WechatComAppChannel
ch = WechatComAppChannel()
elif channel_type == const.WECHAT_KF:
from channel.wechat_kf.wechat_kf_channel import WechatKfChannel
ch = WechatKfChannel()
elif channel_type == const.FEISHU:
from channel.feishu.feishu_channel import FeiShuChanel
ch = FeiShuChanel()

View File

@@ -1028,8 +1028,9 @@ class WebChannel(ChatChannel):
logger.info("[WebChannel] 5. dingtalk - 钉钉")
logger.info("[WebChannel] 6. wecom_bot - 企微智能机器人")
logger.info("[WebChannel] 7. wechatcom_app - 企微自建应用")
logger.info("[WebChannel] 8. wechatmp - 个人公众号")
logger.info("[WebChannel] 9. wechatmp_service - 企业公众号")
logger.info("[WebChannel] 8. wechat_kf - 微信客服")
logger.info("[WebChannel] 9. wechatmp - 个人公众号")
logger.info("[WebChannel] 10. wechatmp_service - 企业公众号")
logger.info("[WebChannel] ✅ Web控制台已运行")
logger.info(f"[WebChannel] 🌐 本地访问: http://localhost:{port}")
if is_public_bind:
@@ -2910,6 +2911,18 @@ class ChannelsHandler:
{"key": "wechatcomapp_port", "label": "Port", "type": "number", "default": 9898},
],
}),
("wechat_kf", {
"label": {"zh": "微信客服", "en": "WeCom Customer Service"},
"icon": "fa-headset",
"color": "emerald",
"fields": [
{"key": "wechat_kf_corp_id", "label": "Corp ID", "type": "text"},
{"key": "wechat_kf_secret", "label": "Secret", "type": "secret"},
{"key": "wechat_kf_token", "label": "Token", "type": "secret"},
{"key": "wechat_kf_aes_key", "label": "AES Key", "type": "secret"},
{"key": "wechat_kf_port", "label": "Port", "type": "number", "default": 9888},
],
}),
("wechatmp", {
"label": {"zh": "公众号", "en": "WeChat MP"},
"icon": "fa-comment-dots",

115
channel/wechat_kf/README.md Normal file
View File

@@ -0,0 +1,115 @@
# 微信客服WeCom Customer Service通道
> 与 `channel/wechatcom/`(企微自建应用)是两个**独立的 CoW 通道**
>
> - 自建应用:**面向企业内部成员**(员工通过企业微信 App 与机器人对话)。
> - 微信客服:**面向外部微信用户**(普通微信用户通过链接/二维码进入对话)。
>
> 但底层都基于"企微自建应用"——本通道是**通过把一个企微自建应用绑定到微信客服账号**来实现 AI 接管对外咨询,详见 [LinkAI 微信客服接入文档](https://docs.link-ai.tech/platform/link-app/wechat-customer-service)。
## 一、接入流程概览
```
┌─────────────────────┐ ┌─────────────────────┐ ┌──────────────────┐
│ 1. 企业微信后台 │ → │ 2. CoW 配置回调 │ → │ 3. 绑定微信客服 │
│ 创建一个自建应用 │ │ 端口 9888 │ │ 账号 │
└─────────────────────┘ └─────────────────────┘ └──────────────────┘
外部微信用户通过
链接/二维码 →
消息 → CoW Bot
```
> **重要**:建议**单独再创建一个企微自建应用**用于微信客服,**不要复用**已经接入员工内部使用的那个 `wechatcom_app` 应用,否则两个通道会争抢同一个回调地址。
## 二、企业微信后台配置
### 1. 创建企微自建应用
进入 企业微信管理后台 → **应用管理****创建应用**
### 2. 收集字段
| 字段 | 来源 | 对应 CoW 配置项 |
|---|---|---|
| 企业IDCorpId | 「我的企业」最下方 | `wechat_kf_corp_id` |
| Secret | 进入应用详情 → 点击「查看」(会推送到管理员手机端,在手机上查看) | `wechat_kf_secret` |
| Token | 应用「接收消息 → 设置API接收」 | `wechat_kf_token` |
| EncodingAESKey | 应用「接收消息 → 设置API接收」 | `wechat_kf_aes_key` |
> AgentId 在本通道**不需要**(消息发送走的是 `cgi-bin/kf/send_msg`,不依赖 agent_id
### 3. 配置回调地址 + 可信 IP
在应用「**接收消息 → 设置API接收**」里填:
- URL`http://<your-host>:9888/wxkf/`(公网必须可达)
- Token / EncodingAESKey与下方 `config.json` 一致
回到应用详情页,把服务器公网 IP 填入「**企业可信IP**」。
### 4. 绑定微信客服账号
进入 企业微信后台 → **微信客服** → 创建客服账号 → **将该账号绑定到上一步创建的企微自建应用**
绑定完成后,进入 **微信客服 → 微信客服账号详情** 页面,在「**接入链接**」一栏:
- 「复制链接」可拿到形如 `https://work.weixin.qq.com/kfid/kfcd83e5896b9ba07be` 的访问链接
- 「生成二维码」可拿到对应二维码
把链接或二维码推给微信客户使用即可。
## 三、CoW 配置(`config.json`
```json
{
"channel_type": "wechat_kf",
"wechat_kf_corp_id": "ww1234567890abcdef",
"wechat_kf_secret": "<企微应用的 Secret>",
"wechat_kf_token": "<接收消息 Token>",
"wechat_kf_aes_key": "<EncodingAESKey>",
"wechat_kf_port": 9888
}
```
| 字段 | 说明 |
|---|---|
| `wechat_kf_corp_id` | 企业 ID |
| `wechat_kf_secret` | **绑定到微信客服**的那个企微自建应用的 Secret |
| `wechat_kf_token` | 该应用「接收消息」配置的 Token |
| `wechat_kf_aes_key` | 该应用「接收消息」配置的 EncodingAESKey |
| `wechat_kf_port` | 监听端口,默认 `9888` |
也支持环境变量:`WECHAT_KF_CORP_ID` / `WECHAT_KF_SECRET` / `WECHAT_KF_TOKEN` / `WECHAT_KF_AES_KEY`
## 四、运行
```bash
python app.py
```
启动后日志里会看到:
```
[wechat_kf] WeCom customer-service channel started
[wechat_kf] Listening on http://0.0.0.0:9888/wxkf/
```
回到企微后台「设置API接收」点击保存——会触发 `GET /wxkf/?...&echostr=...`CoW 通过 `crypto.check_signature` 校验后返回明文 `echostr`,验证成功。
## 五、支持的回复类型
| ReplyType | 是否支持 | 备注 |
|---|---|---|
| `TEXT` / `INFO` / `ERROR` | ✅ | 自动按 2048 字节切片分段发送 |
| `IMAGE`(本地) / `IMAGE_URL`(网络) | ✅ | 大图自动压缩到 10MB 以内 |
| `VOICE` | ✅ | 转 amr 后发送,>60s 自动切片 |
| `VIDEO_URL` | ✅ | 通过临时素材接口上传 |
| `FILE` | ✅ | |
## 六、参考文档
- [LinkAI 微信客服接入文档](https://docs.link-ai.tech/platform/link-app/wechat-customer-service)
- [企业微信开放接口 - 微信客服 - 接收消息](https://developer.work.weixin.qq.com/document/path/94670)
- [企业微信开放接口 - 微信客服 - 发送消息](https://developer.work.weixin.qq.com/document/path/95122)

View File

@@ -0,0 +1,603 @@
# -*- coding=utf-8 -*-
"""
WeCom Customer Service (微信客服) channel for CoW.
Differences from `channel/wechatcom/` (企微自建应用):
1. Audience: external WeChat users (not internal members).
2. Receiver fields: `external_userid` + `open_kfid` instead of a single
member `userid`.
3. Inbound flow: callback only delivers an event token, the actual
message bodies must be pulled via `cgi-bin/kf/sync_msg` with a
persistent cursor. See `wechat_kf_cursor_store.py`.
4. Outbound flow: messages are sent via `cgi-bin/kf/send_msg` (each
request must specify both `touser` and `open_kfid`); wechatpy has
no native helper, so we call the HTTP endpoint directly.
"""
import io
import json
import os
import threading
import time
import xml.etree.ElementTree as ET
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
import requests
import web
from wechatpy.enterprise import WeChatClient
from wechatpy.enterprise.crypto import WeChatCrypto
from wechatpy.enterprise.exceptions import InvalidCorpIdException
from wechatpy.exceptions import InvalidSignatureException, WeChatClientException
from bridge.context import Context, ContextType
from bridge.reply import Reply, ReplyType
from channel.chat_channel import ChatChannel
from channel.file_cache import get_file_cache
from channel.wechat_kf.wechat_kf_cursor_store import CursorStore
from channel.wechat_kf.wechat_kf_message import WechatKfMessage
from common.log import logger
from common.singleton import singleton
from common.utils import (
compress_imgfile,
fsize,
remove_markdown_symbol,
split_string_by_utf8_length,
)
from config import conf
try:
from voice.audio_convert import any_to_amr, split_audio
except ImportError as e: # voice features optional
logger.debug(
"[wechat_kf] import voice.audio_convert failed, voice will be disabled: {}".format(e)
)
MAX_UTF8_LEN = 2048
KF_API_BASE = "https://qyapi.weixin.qq.com/cgi-bin/kf"
SYNC_MSG_LIMIT = 1000
@singleton
class WechatKfChannel(ChatChannel):
NOT_SUPPORT_REPLYTYPE = []
def __init__(self):
super().__init__()
self.corp_id = conf().get("wechat_kf_corp_id")
self.secret = conf().get("wechat_kf_secret")
self.token = conf().get("wechat_kf_token")
self.aes_key = conf().get("wechat_kf_aes_key")
self._http_server = None
logger.info(
"[wechat_kf] Initializing WeCom customer-service channel, corp_id: {}".format(
self.corp_id
)
)
self.crypto = WeChatCrypto(self.token, self.aes_key, self.corp_id)
# Use the stock wechatpy WeChatClient so that the access_token is
# cached and only refreshed when actually expired (~2h). The local
# `WechatComAppClient` subclass has a broken background refresh
# loop that re-fetches every 60s and a `fetch_access_token()`
# override that may return a dict instead of a string, which
# corrupts URLs and triggers errcode 40014.
self.client = WeChatClient(self.corp_id, self.secret)
# Persist sync_msg cursor under the user's home dir by default,
# so it survives `tmp/` cleanups and cwd changes across restarts.
cursor_path = os.path.expanduser(
conf().get("wechat_kf_cursor_path") or "~/.wechat_kf_cursors.json"
)
self.cursor_store = CursorStore(cursor_path)
# WeCom requires the callback HTTP response to return within ~5s,
# otherwise it retries the same notification. sync_msg pulling
# can easily exceed that, so we dispatch it to a background pool
# and let `Query.POST` reply success immediately.
self._callback_executor = ThreadPoolExecutor(
max_workers=4, thread_name_prefix="wxkf-cb"
)
# Per-open_kfid lock: serialize sync_msg for the same kf account
# so that callback retries (or rapid-fire events) don't race on
# the same cursor and produce duplicate replies.
self._kf_locks: dict = defaultdict(threading.Lock)
self._kf_locks_guard = threading.Lock()
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def startup(self):
urls = ("/wxkf/?", "channel.wechat_kf.wechat_kf_channel.Query")
app = web.application(urls, globals(), autoreload=False)
port = conf().get("wechat_kf_port", 9888)
logger.info("[wechat_kf] WeCom customer-service channel started")
logger.info("[wechat_kf] Listening on http://0.0.0.0:{}/wxkf/".format(port))
func = web.httpserver.StaticMiddleware(app.wsgifunc())
func = web.httpserver.LogMiddleware(func)
server = web.httpserver.WSGIServer(("0.0.0.0", port), func)
self._http_server = server
try:
server.start()
except (KeyboardInterrupt, SystemExit):
server.stop()
def stop(self):
if self._http_server:
try:
self._http_server.stop()
logger.info("[wechat_kf] HTTP server stopped")
except Exception as e:
logger.warning(f"[wechat_kf] Error stopping HTTP server: {e}")
self._http_server = None
try:
self._callback_executor.shutdown(wait=False)
except Exception as e:
logger.warning(f"[wechat_kf] Error shutting down callback executor: {e}")
# ------------------------------------------------------------------
# Outbound — implementing the abstract `send` contract
# ------------------------------------------------------------------
def send(self, reply: Reply, context: Context):
receiver = context["receiver"]
msg = context.kwargs.get("msg")
external_userid = context.get("external_userid") or (msg.external_userid if msg else None)
open_kfid = context.get("open_kfid") or (msg.open_kfid if msg else None)
if not external_userid or not open_kfid:
logger.error(
"[wechat_kf] missing external_userid or open_kfid, cannot send: "
f"external_userid={external_userid}, open_kfid={open_kfid}"
)
return
if reply.type in [ReplyType.TEXT, ReplyType.ERROR, ReplyType.INFO]:
reply_text = remove_markdown_symbol(reply.content)
texts = split_string_by_utf8_length(reply_text, MAX_UTF8_LEN)
if len(texts) > 1:
logger.info(
"[wechat_kf] text too long, split into {} parts".format(len(texts))
)
for i, text in enumerate(texts):
self._send_text(external_userid, open_kfid, text)
if i != len(texts) - 1:
time.sleep(0.5)
logger.info("[wechat_kf] Do send text to {}: {}".format(receiver, reply_text))
elif reply.type == ReplyType.VOICE:
file_path = reply.content
try:
amr_file = os.path.splitext(file_path)[0] + ".amr"
any_to_amr(file_path, amr_file)
duration, files = split_audio(amr_file, 60 * 1000)
if len(files) > 1:
logger.info(
"[wechat_kf] voice too long {}s > 60s, split into {} parts".format(
duration / 1000.0, len(files)
)
)
media_ids = []
for path in files:
with open(path, "rb") as f:
response = self.client.media.upload("voice", f)
logger.debug("[wechat_kf] upload voice response: {}".format(response))
media_ids.append(response["media_id"])
except ImportError as e:
logger.error("[wechat_kf] voice conversion failed: {}".format(e))
logger.error("[wechat_kf] please install pydub: pip install pydub")
return
except WeChatClientException as e:
logger.error("[wechat_kf] upload voice failed: {}".format(e))
return
try:
os.remove(file_path)
if amr_file != file_path:
os.remove(amr_file)
except Exception:
pass
for media_id in media_ids:
self._send_voice(external_userid, open_kfid, media_id)
time.sleep(1)
logger.info("[wechat_kf] sendVoice={}, receiver={}".format(reply.content, receiver))
elif reply.type == ReplyType.IMAGE_URL:
img_url = reply.content
pic_res = requests.get(img_url, stream=True)
image_storage = io.BytesIO()
for block in pic_res.iter_content(1024):
image_storage.write(block)
sz = fsize(image_storage)
if sz >= 10 * 1024 * 1024:
logger.info("[wechat_kf] image too large, compressing, sz={}".format(sz))
image_storage = compress_imgfile(image_storage, 10 * 1024 * 1024 - 1)
image_storage.seek(0)
try:
response = self.client.media.upload("image", image_storage)
except WeChatClientException as e:
logger.error("[wechat_kf] upload image failed: {}".format(e))
return
self._send_image(external_userid, open_kfid, response["media_id"])
logger.info("[wechat_kf] sendImage url={}, receiver={}".format(img_url, receiver))
elif reply.type == ReplyType.IMAGE:
image_storage = reply.content
sz = fsize(image_storage)
if sz >= 10 * 1024 * 1024:
logger.info("[wechat_kf] image too large, compressing, sz={}".format(sz))
image_storage = compress_imgfile(image_storage, 10 * 1024 * 1024 - 1)
image_storage.seek(0)
try:
response = self.client.media.upload("image", image_storage)
except WeChatClientException as e:
logger.error("[wechat_kf] upload image failed: {}".format(e))
return
self._send_image(external_userid, open_kfid, response["media_id"])
logger.info("[wechat_kf] sendImage, receiver={}".format(receiver))
elif reply.type == ReplyType.VIDEO_URL:
video_url = reply.content
try:
response = self.client.media.upload(
"video", requests.get(video_url, stream=True).content
)
except WeChatClientException as e:
logger.error("[wechat_kf] upload video failed: {}".format(e))
return
self._send_video(external_userid, open_kfid, response["media_id"])
logger.info("[wechat_kf] sendVideo url={}, receiver={}".format(video_url, receiver))
elif reply.type == ReplyType.FILE:
file_path = reply.content
try:
with open(file_path, "rb") as f:
response = self.client.media.upload(
"file", (os.path.basename(file_path), f.read())
)
except WeChatClientException as e:
logger.error("[wechat_kf] upload file failed: {}".format(e))
return
self._send_file(external_userid, open_kfid, response["media_id"])
logger.info("[wechat_kf] sendFile={}, receiver={}".format(file_path, receiver))
else:
logger.warning("[wechat_kf] unsupported reply type: {}".format(reply.type))
# ------------------------------------------------------------------
# Inbound — pull messages by cursor
# ------------------------------------------------------------------
def _get_kf_lock(self, open_kfid: str) -> threading.Lock:
with self._kf_locks_guard:
return self._kf_locks[open_kfid]
def submit_callback(self, token: str, open_kfid: str):
"""
Async entry point used by the HTTP handler. Submits the actual
sync_msg pulling to a background thread so the callback response
can return within WeCom's 5s deadline.
"""
try:
self._callback_executor.submit(self._run_callback, token, open_kfid)
except RuntimeError as e:
# Executor may be shut down during process exit; fall back
# to inline execution so we don't silently drop the event.
logger.warning(f"[wechat_kf] executor unavailable, run inline: {e}")
self._run_callback(token, open_kfid)
def _run_callback(self, token: str, open_kfid: str):
# Block on the per-kfid lock so retried callbacks queue up
# behind the in-flight one. The queued worker will then call
# sync_msg with the (already advanced) cursor, which is cheap
# when there is nothing new and still picks up any messages
# that arrived after the previous worker's last pull.
lock = self._get_kf_lock(open_kfid)
with lock:
try:
self.consume_callback(token, open_kfid)
except Exception as e:
logger.exception(f"[wechat_kf] consume_callback error: {e}")
def consume_callback(self, token: str, open_kfid: str):
"""
Called from the HTTP `Query.POST` handler whenever WeCom notifies
us that there are new messages for `open_kfid`. Pulls all new
messages via sync_msg and feeds them into `produce()`.
"""
existing_cursor = self.cursor_store.get(open_kfid)
# First-time bootstrap: always skip history, otherwise WeCom would
# replay up to 14 days of messages on the very first callback and
# flood every user with auto-replies.
if not existing_cursor:
self._initialize_cursor(token, open_kfid)
return
msgs = self._pull_messages(token, open_kfid, existing_cursor)
if not msgs:
return
file_cache = get_file_cache()
for raw in msgs:
try:
kf_msg = WechatKfMessage(msg=raw, client=self.client)
except NotImplementedError as e:
logger.debug("[wechat_kf] {}".format(e))
continue
session_id = kf_msg.from_user_id
# Cache lone images/files and wait for the user's follow-up
# text. Agent mode never reads memory.USER_IMAGE_CACHE, so
# without this the attachment is effectively lost.
if kf_msg.ctype in (ContextType.IMAGE, ContextType.FILE):
ftype = "image" if kf_msg.ctype == ContextType.IMAGE else "file"
try:
kf_msg.prepare() # download to local tmp path
file_cache.add(session_id, kf_msg.content, file_type=ftype)
logger.info(
"[wechat_kf] {} cached for session {}: {}".format(
ftype, session_id, kf_msg.content
)
)
except Exception as e:
logger.warning(f"[wechat_kf] cache {ftype} failed: {e}")
continue
# On a text turn, attach any pending images/files as references
# so the downstream agent can pick them up via the text content.
# Paths are already under agent_workspace/tmp (see
# WechatKfMessage._get_tmp_dir), so a relative ref also works.
if kf_msg.ctype == ContextType.TEXT:
cached_files = file_cache.get(session_id)
if cached_files:
refs = []
for fi in cached_files:
ftype, fpath = fi["type"], fi["path"]
if ftype == "image":
refs.append(f"[图片: {fpath}]")
else:
refs.append(f"[文件: {fpath}]")
kf_msg.content = kf_msg.content + "\n" + "\n".join(refs)
file_cache.clear(session_id)
context = self._compose_context(
kf_msg.ctype,
kf_msg.content,
isgroup=False,
msg=kf_msg,
)
if context:
self.produce(context)
time.sleep(0.05) # tiny gap between messages of the same batch
def _initialize_cursor(self, token: str, open_kfid: str):
"""
Drain all current messages for this `open_kfid` without producing
any context, just to advance the cursor to "now". This prevents
a fresh deployment from replying to up to ~14 days of history.
"""
next_cursor = ""
total_skipped = 0
while True:
data = self._call_sync_msg(token, open_kfid, next_cursor)
if data is None:
break
msg_list = data.get("msg_list") or []
total_skipped += len(msg_list)
cursor_after = data.get("next_cursor") or ""
if cursor_after:
self.cursor_store.set(open_kfid, cursor_after)
if not data.get("has_more"):
break
if not cursor_after or cursor_after == next_cursor:
break
next_cursor = cursor_after
logger.info(
"[wechat_kf] first-start bootstrap finished for open_kfid={}, "
"skipped {} historical messages".format(open_kfid, total_skipped)
)
def _pull_messages(self, token: str, open_kfid: str, next_cursor: Optional[str]) -> list:
"""Loop sync_msg until `has_more` is false. Returns raw msg dicts."""
collected = []
cursor = next_cursor or ""
while True:
data = self._call_sync_msg(token, open_kfid, cursor)
if data is None:
break
for item in data.get("msg_list") or []:
# Only consume messages from external users; ignore replies
# generated by our own kf account, otherwise we would loop
# back into ourselves.
if not item.get("external_userid"):
continue
if item.get("msgtype") in ("text", "image", "voice", "file"):
collected.append(item)
cursor_after = data.get("next_cursor") or ""
if cursor_after:
self.cursor_store.set(open_kfid, cursor_after)
if not data.get("has_more"):
break
if not cursor_after or cursor_after == cursor:
break
cursor = cursor_after
if collected:
collected = _dedup_image_text_pair(collected)
logger.info(
"[wechat_kf] pulled {} messages for open_kfid={}".format(len(collected), open_kfid)
)
return collected
def _call_sync_msg(self, token: str, open_kfid: str, cursor: str) -> Optional[dict]:
# `client.access_token` is the cached string property; do not use
# `fetch_access_token()` here — wechatpy returns the raw response
# dict from that call, which corrupts the query string.
url = f"{KF_API_BASE}/sync_msg?access_token={self.client.access_token}"
payload = {
"token": token,
"open_kfid": open_kfid,
"limit": SYNC_MSG_LIMIT,
}
if cursor:
payload["cursor"] = cursor
try:
resp = requests.post(url, json=payload, timeout=10).json()
except Exception as e:
logger.error(f"[wechat_kf] sync_msg request failed: {e}")
return None
if resp.get("errcode") != 0:
logger.error(
f"[wechat_kf] sync_msg errcode={resp.get('errcode')}, "
f"errmsg={resp.get('errmsg')}, open_kfid={open_kfid}"
)
return None
return resp
# ------------------------------------------------------------------
# Outbound HTTP wrappers (kf/send_msg)
# ------------------------------------------------------------------
def _post_send_msg(self, payload: dict) -> dict:
url = f"{KF_API_BASE}/send_msg?access_token={self.client.access_token}"
try:
resp = requests.post(url, json=payload, timeout=10).json()
except Exception as e:
logger.error(f"[wechat_kf] send_msg request failed: {e}")
return {"errcode": -1, "errmsg": str(e)}
if resp.get("errcode") != 0:
logger.error(f"[wechat_kf] send_msg failed, payload={payload}, resp={resp}")
return resp
def _send_text(self, external_userid: str, open_kfid: str, content: str) -> dict:
return self._post_send_msg({
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "text",
"text": {"content": content},
})
def _send_image(self, external_userid: str, open_kfid: str, media_id: str) -> dict:
return self._post_send_msg({
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "image",
"image": {"media_id": media_id},
})
def _send_voice(self, external_userid: str, open_kfid: str, media_id: str) -> dict:
return self._post_send_msg({
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "voice",
"voice": {"media_id": media_id},
})
def _send_video(self, external_userid: str, open_kfid: str, media_id: str) -> dict:
return self._post_send_msg({
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "video",
"video": {"media_id": media_id},
})
def _send_file(self, external_userid: str, open_kfid: str, media_id: str) -> dict:
return self._post_send_msg({
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "file",
"file": {"media_id": media_id},
})
def _send_link(self, external_userid: str, open_kfid: str, link_data: dict) -> dict:
return self._post_send_msg({
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "link",
"link": link_data,
})
def _dedup_image_text_pair(messages: list) -> list:
"""
A WeChat user often sends an image immediately followed by a text
question (e.g. "what's in this picture?"). Only when the batch is
exactly that 2-message image+text pair within a 5s window do we
collapse it into a single [image, text] turn. Otherwise return
every message so rapid-fire texts/images are all processed —
cursor freshness is already guaranteed by sync_msg.
"""
if not messages:
return []
if len(messages) == 2:
a, b = messages
types = {a["msgtype"], b["msgtype"]}
if types == {"image", "text"} and abs(a["send_time"] - b["send_time"]) <= 5:
img = a if a["msgtype"] == "image" else b
txt = b if a["msgtype"] == "image" else a
return [img, txt]
return messages
# ----------------------------------------------------------------------
# HTTP handlers (web.py)
# ----------------------------------------------------------------------
class Query:
def GET(self):
channel = WechatKfChannel()
params = web.input()
logger.info("[wechat_kf] verify params: {}".format(params))
try:
signature = params.msg_signature
timestamp = params.timestamp
nonce = params.nonce
echostr = params.echostr
echostr = channel.crypto.check_signature(signature, timestamp, nonce, echostr)
except (InvalidSignatureException, InvalidCorpIdException):
raise web.Forbidden()
return echostr
def POST(self):
channel = WechatKfChannel()
params = web.input()
try:
signature = params.msg_signature
timestamp = params.timestamp
nonce = params.nonce
raw_body = web.data()
decrypted = channel.crypto.decrypt_message(raw_body, signature, timestamp, nonce)
except (InvalidSignatureException, InvalidCorpIdException) as e:
logger.warning(f"[wechat_kf] invalid signature: {e}")
raise web.Forbidden()
# We need the Token + OpenKfId fields from the inner XML to call
# sync_msg. wechatpy's parsed object exposes neither, so we parse
# the raw XML directly.
try:
root = ET.fromstring(decrypted)
except ET.ParseError as e:
logger.error(f"[wechat_kf] xml parse error: {e}")
return "success"
msg_type = (root.findtext("MsgType") or "").strip()
event = (root.findtext("Event") or "").strip()
if msg_type != "event" or event != "kf_msg_or_event":
logger.debug(
f"[wechat_kf] ignored callback msg_type={msg_type}, event={event}"
)
return "success"
token = root.findtext("Token") or ""
open_kfid = root.findtext("OpenKfId") or ""
if not token or not open_kfid:
logger.warning(
f"[wechat_kf] callback missing token or open_kfid: {decrypted}"
)
return "success"
# Hand off to a background worker — WeCom requires the callback
# to return success within ~5 seconds, otherwise it will retry
# and we may race the same cursor window into duplicate replies.
channel.submit_callback(token, open_kfid)
return "success"

View File

@@ -0,0 +1,80 @@
# -*- coding=utf-8 -*-
"""
Local-file based persistence for WeCom customer-service `next_cursor`.
Why we need this:
The WeCom customer-service (微信客服) callback only notifies us that
"new messages exist". To actually fetch them we must call the
`cgi-bin/kf/sync_msg` endpoint with a `cursor` so that we only get
messages newer than the previously processed one. If we lose this
cursor (e.g. on process restart) WeCom will replay up to ~14 days of
history, which would cause the bot to flood users with duplicate
replies.
This implementation deliberately avoids any external dependency
(no Redis / no DB) — a single JSON file under the project's tmp dir is
enough for a CoW-style single-process deployment.
"""
import json
import os
import threading
from typing import Optional
from common.log import logger
class CursorStore:
"""Thread-safe per-`open_kfid` cursor store backed by a JSON file."""
def __init__(self, file_path: str):
self._file_path = file_path
self._lock = threading.Lock()
self._data = self._load()
def _load(self) -> dict:
try:
if os.path.exists(self._file_path):
with open(self._file_path, "r", encoding="utf-8") as f:
return json.load(f) or {}
except Exception as e:
logger.warning(f"[wechat_kf] failed to load cursor file {self._file_path}: {e}")
return {}
def _flush_locked(self):
# Atomic write: write to *.tmp first then rename, avoid corruption on crash.
tmp_path = self._file_path + ".tmp"
try:
os.makedirs(os.path.dirname(self._file_path) or ".", exist_ok=True)
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(self._data, f, ensure_ascii=False)
os.replace(tmp_path, self._file_path)
# Tighten permissions: cursor file lives in $HOME, restrict to owner.
# No-op on Windows.
try:
os.chmod(self._file_path, 0o600)
except Exception:
pass
except Exception as e:
logger.warning(f"[wechat_kf] failed to flush cursor file {self._file_path}: {e}")
try:
if os.path.exists(tmp_path):
os.remove(tmp_path)
except Exception:
pass
def get(self, open_kfid: str) -> Optional[str]:
with self._lock:
return self._data.get(open_kfid)
def set(self, open_kfid: str, cursor: str):
if not cursor:
return
with self._lock:
if self._data.get(open_kfid) == cursor:
return
self._data[open_kfid] = cursor
self._flush_locked()
def has(self, open_kfid: str) -> bool:
with self._lock:
return open_kfid in self._data

View File

@@ -0,0 +1,134 @@
# -*- coding=utf-8 -*-
"""
Adapter that turns a single `sync_msg` item from WeCom customer-service
into a CoW `ChatMessage` object.
"""
import os
import re
from wechatpy.enterprise import WeChatClient
from bridge.context import ContextType
from channel.chat_message import ChatMessage
from common.log import logger
from common.utils import expand_path
from config import conf
def _get_tmp_dir() -> str:
"""Save under agent_workspace/tmp/ so agent tools (e.g. `read`) can
resolve a relative path like `tmp/xxx.pdf` against their own
workspace root. Mirrors the convention used by weixin / wecom_bot.
"""
ws_root = expand_path(conf().get("agent_workspace", "~/cow"))
tmp_dir = os.path.join(ws_root, "tmp")
os.makedirs(tmp_dir, exist_ok=True)
return tmp_dir
def _extract_filename(content_disposition: str) -> str:
"""Best-effort parse of `filename` / `filename*` from a Content-Disposition
header. Returns '' when nothing usable is found."""
if not content_disposition:
return ""
# RFC 5987 form: filename*=UTF-8''xxx
m = re.search(r"filename\*=(?:[^'\"]*'[^']*'\s*)?([^;]+)", content_disposition)
if m:
try:
from urllib.parse import unquote
return unquote(m.group(1).strip().strip('"'))
except Exception:
return m.group(1).strip().strip('"')
m = re.search(r'filename\s*=\s*"?([^";]+)"?', content_disposition)
return m.group(1).strip() if m else ""
class WechatKfMessage(ChatMessage):
"""
msg structure (from cgi-bin/kf/sync_msg):
{
"msgid": "...",
"send_time": 1700000000,
"origin": 3,
"msgtype": "text" | "image" | "voice" | ...,
"open_kfid": "wkxxxx",
"external_userid": "wmxxxx",
"text": {"content": "..."},
"image": {"media_id": "..."},
"voice": {"media_id": "..."},
...
}
"""
def __init__(self, msg: dict, client: WeChatClient = None, is_group: bool = False):
# NOTE: skip parent constructor because it expects a wechatpy parsed
# message object, while here we receive a raw dict from sync_msg.
super().__init__(msg)
self.is_group = is_group
self.msg_id = msg.get("msgid")
self.create_time = msg.get("send_time")
self.origin = msg.get("origin")
self.msgtype = msg.get("msgtype")
self.open_kfid = msg.get("open_kfid")
self.external_userid = msg.get("external_userid")
if self.msgtype == "text":
self.ctype = ContextType.TEXT
self.content = msg.get("text", {}).get("content", "")
elif self.msgtype == "image":
self.ctype = ContextType.IMAGE
media_id = msg.get("image", {}).get("media_id", "")
self.content = os.path.join(_get_tmp_dir(), media_id + ".jpg")
def download_image():
response = client.media.download(media_id)
if response.status_code == 200:
with open(self.content, "wb") as f:
f.write(response.content)
else:
logger.info(f"[wechat_kf] Failed to download image, {response.content}")
self._prepare_fn = download_image
elif self.msgtype == "voice":
self.ctype = ContextType.VOICE
media_id = msg.get("voice", {}).get("media_id", "")
# WeCom returns amr by default; downstream voice pipeline will convert.
self.content = os.path.join(_get_tmp_dir(), media_id + ".amr")
def download_voice():
response = client.media.download(media_id)
if response.status_code == 200:
with open(self.content, "wb") as f:
f.write(response.content)
else:
logger.info(f"[wechat_kf] Failed to download voice, {response.content}")
self._prepare_fn = download_voice
elif self.msgtype == "file":
self.ctype = ContextType.FILE
media_id = msg.get("file", {}).get("media_id", "")
# Provisional path; rewritten in download_file() once we have
# the original filename from Content-Disposition.
self.content = os.path.join(_get_tmp_dir(), media_id)
def download_file():
response = client.media.download(media_id)
if response.status_code == 200:
filename = _extract_filename(
response.headers.get("Content-Disposition", "")
) or media_id
self.content = os.path.join(_get_tmp_dir(), filename)
with open(self.content, "wb") as f:
f.write(response.content)
else:
logger.info(f"[wechat_kf] Failed to download file, {response.content}")
self._prepare_fn = download_file
else:
raise NotImplementedError(
f"[wechat_kf] Unsupported message type: {self.msgtype}"
)
self.from_user_id = self.external_userid
self.to_user_id = self.open_kfid
self.other_user_id = self.external_userid

View File

@@ -103,14 +103,21 @@ class Query:
task_running = True
waiting_until = request_time + 4
while time.time() < waiting_until:
if from_user in channel.running:
time.sleep(0.1)
else:
if from_user not in channel.running:
task_running = False
break
# Task still running, but if it has already produced cached
# segments (e.g. multi-turn thinking output), return them now
# instead of forcing the user to wait for the whole task. The
# remaining segments are fetched by the user's next message.
if channel.cache_dict.get(from_user):
break
time.sleep(0.1)
reply_text = ""
if task_running:
# Only fall back to retry / "thinking" hint when the task is still
# running AND there is nothing cached to send yet.
if task_running and not channel.cache_dict.get(from_user):
if request_cnt < 3:
# waiting for timeout (the POST request will be closed by Wechat official server)
time.sleep(2)
@@ -131,8 +138,22 @@ class Query:
# Only one request can access to the cached data
try:
(reply_type, reply_content) = channel.cache_dict[from_user].pop(0)
if not channel.cache_dict[from_user]: # If popping the message makes the list empty, delete the user entry from cache
# WeChat passive reply allows only a single reply per request.
# To avoid forcing the user to send an extra message for every
# segment of multi-turn agent output, drain all consecutive
# cached text segments at once and merge them into one reply.
# Media (voice/image) can only be returned one at a time, so it
# stops the merge and is returned on its own.
cached = channel.cache_dict[from_user]
if cached[0][0] == "text":
reply_type = "text"
merged_parts = []
while cached and cached[0][0] == "text":
merged_parts.append(cached.pop(0)[1])
reply_content = "\n\n".join(merged_parts)
else:
(reply_type, reply_content) = cached.pop(0)
if not channel.cache_dict[from_user]: # If draining empties the list, delete the user entry from cache
del channel.cache_dict[from_user]
except IndexError:
return "success"

View File

@@ -134,10 +134,16 @@ class WechatMPChannel(ChatChannel):
elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
img_url = reply.content
pic_res = requests.get(img_url, stream=True)
image_storage = io.BytesIO()
for block in pic_res.iter_content(1024):
image_storage.write(block)
if img_url.startswith("file://") or os.path.isfile(img_url):
# Local file produced by the agent (e.g. a generated image)
local_path = img_url[len("file://"):] if img_url.startswith("file://") else img_url
with open(local_path, "rb") as f:
image_storage.write(f.read())
else:
pic_res = requests.get(img_url, stream=True)
for block in pic_res.iter_content(1024):
image_storage.write(block)
image_storage.seek(0)
image_type = imghdr.what(image_storage)
filename = receiver + "-" + str(context["msg"].msg_id) + "." + image_type
@@ -258,10 +264,16 @@ class WechatMPChannel(ChatChannel):
logger.info("[wechatmp] Do send voice to {}".format(receiver))
elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
img_url = reply.content
pic_res = requests.get(img_url, stream=True)
image_storage = io.BytesIO()
for block in pic_res.iter_content(1024):
image_storage.write(block)
if img_url.startswith("file://") or os.path.isfile(img_url):
# Local file produced by the agent (e.g. a generated image)
local_path = img_url[len("file://"):] if img_url.startswith("file://") else img_url
with open(local_path, "rb") as f:
image_storage.write(f.read())
else:
pic_res = requests.get(img_url, stream=True)
for block in pic_res.iter_content(1024):
image_storage.write(block)
image_storage.seek(0)
image_type = imghdr.what(image_storage)
filename = receiver + "-" + str(context["msg"].msg_id) + "." + image_type

View File

@@ -244,5 +244,6 @@ DINGTALK = "dingtalk"
WECOM_BOT = "wecom_bot"
QQ = "qq"
WEIXIN = "weixin"
WECHAT_KF = "wechat_kf"
TELEGRAM = "telegram"
SLACK = "slack"

View File

@@ -151,6 +151,13 @@ available_setting = {
"wechatcomapp_secret": "", # 企业微信app的secret
"wechatcomapp_agent_id": "", # 企业微信app的agent_id
"wechatcomapp_aes_key": "", # 企业微信app的aes_key
# 微信客服(wechat_kf)的配置
"wechat_kf_corp_id": "", # 微信客服所在企业的corp_id
"wechat_kf_token": "", # 微信客服回调token
"wechat_kf_port": 9888, # 微信客服回调服务端口
"wechat_kf_secret": "", # 微信客服应用的secret
"wechat_kf_aes_key": "", # 微信客服回调aes_key
"wechat_kf_cursor_path": "~/.wechat_kf_cursors.json", # 微信客服sync_msg游标持久化文件路径
# 飞书配置
"feishu_port": 80, # 飞书bot监听端口仅webhook模式需要
"feishu_app_id": "", # 飞书机器人应用APP Id
@@ -183,7 +190,7 @@ available_setting = {
# chatgpt指令自定义触发词
"clear_memory_commands": ["#清除记忆"], # 重置会话指令,必须以#开头
# channel配置
"channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wecom_bot,weixin,wechatmp,wechatmp_service,wechatcom_app,telegram,slack
"channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wecom_bot,weixin,wechatmp,wechatmp_service,wechatcom_app,wechat_kf,telegram,slack
"web_console": True, # 是否自动启动Web控制台默认启动。设为False可禁用
"subscribe_msg": "", # 订阅消息, 支持: wechatmp, wechatmp_service, wechatcom_app
"debug": False, # 是否开启debug模式开启后会打印更多日志
@@ -435,6 +442,11 @@ def load_config():
"wechatmp_app_secret": "WECHATMP_APP_SECRET",
"wechatcomapp_agent_id": "WECHATCOMAPP_AGENT_ID",
"wechatcomapp_secret": "WECHATCOMAPP_SECRET",
"wechatcom_corp_id": "WECHATCOM_CORP_ID",
"wechat_kf_corp_id": "WECHAT_KF_CORP_ID",
"wechat_kf_secret": "WECHAT_KF_SECRET",
"wechat_kf_token": "WECHAT_KF_TOKEN",
"wechat_kf_aes_key": "WECHAT_KF_AES_KEY",
"qq_app_id": "QQ_APP_ID",
"qq_app_secret": "QQ_APP_SECRET",
"weixin_token": "WEIXIN_TOKEN",

130
docs/channels/wechat-kf.mdx Normal file
View File

@@ -0,0 +1,130 @@
---
title: 微信客服
description: 将 CowAgent 接入微信客服WeCom Customer Service
---
通过把企业微信自建应用绑定到「微信客服」账号CowAgent 可以接管来自外部微信用户的客服咨询,并可在小程序、公众号、视频号及视频号小店等场景中通过链接或二维码触达微信用户。
<Note>
微信客服只能使用 Docker 部署或服务器 Python 部署,需要公网可达的回调地址,不支持本地运行模式。
</Note>
## 一、准备
需要的资源:
1. 一台服务器(有公网 IP
2. 注册并已认证的企业微信
3. 已开通「微信客服」能力
<Note>
建议**单独再创建一个企微自建应用**用于微信客服,不要复用已有的 `wechatcom_app` 应用,否则两个通道会争抢同一个回调地址。
</Note>
## 二、创建企业微信自建应用
1. 在 [企业微信管理后台](https://work.weixin.qq.com/wework_admin/frame#apps),点击 **应用管理 → 创建应用**
<img src="https://cdn.link-ai.tech/doc/20260228103156.png" width="480"/>
2. 点击 **我的企业**,在最下方获取 **企业ID**(后续填写到 `wechat_kf_corp_id`
<img src="https://cdn.link-ai.tech/doc/wechatcom-hosting-companyid.png" width="600"/>
3. 进入上一步创建的应用,点击 Secret 旁的「**查看**」Secret 会推送到管理员手机端的企业微信里查看:
<img src="https://cdn.link-ai.tech/doc/wechatcom-hosting-agent-secret.png" width="600"/>
4. 进入应用 **接收消息 → 设置API接收**,点击右侧「**随机获取**」生成 **Token** 和 **EncodingAESKey** 并保存:
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wechatcom-hosting-token-aeskey.jpg" width="600"/>
<Note>
此时保存 API 接收配置会失败,因为程序还未启动,等项目运行后再回来保存。
</Note>
## 三、配置和运行
填入上一步收集到的 4 个字段Corp ID / Secret / Token / EncodingAESKey
<Tabs>
<Tab title="Web 控制台">
启动 Cow 项目后打开 Web 控制台,选择 **通道** 菜单,点击 **接入通道**,选择 **微信客服**,依次填入 Corp ID / Secret / Token / AES Key端口默认 9888可改点击接入即可。
<img src="https://cdn.link-ai.tech/doc/cow-weixinkefu-web-control.png" width="800"/>
</Tab>
<Tab title="配置文件">
在 `config.json` 中添加以下配置(各参数与企业微信后台的对应关系见上方截图):
```json
{
"channel_type": "wechat_kf",
"wechat_kf_corp_id": "YOUR_CORP_ID",
"wechat_kf_secret": "YOUR_SECRET",
"wechat_kf_token": "YOUR_TOKEN",
"wechat_kf_aes_key": "YOUR_AES_KEY",
"wechat_kf_port": 9888
}
```
| 参数 | 说明 |
| --- | --- |
| `wechat_kf_corp_id` | 企业 ID |
| `wechat_kf_secret` | 绑定到微信客服的那个企微自建应用的 Secret |
| `wechat_kf_token` | API 接收配置中的 Token |
| `wechat_kf_aes_key` | API 接收配置中的 EncodingAESKey |
| `wechat_kf_port` | 监听端口,默认 9888 |
</Tab>
</Tabs>
接入完成后启动程序Web 控制台方式会自动重启),日志中出现 `Listening on http://0.0.0.0:9888/wxkf/` 表示运行成功,需要将该端口对外开放(如在云服务器安全组中放行)。
接着回到企业微信「接收消息 → 设置API接收」将回调 URL 填为 `http://<your-host>:9888/wxkf/`,点击保存。保存成功后还需将服务器 IP 添加到 **企业可信IP** 中,否则无法收发消息:
<img src="https://cdn.link-ai.tech/doc/wechat-com_config.png" width="600"/>
<img src="https://cdn.link-ai.tech/doc/20260228103224.png" width="520"/>
<Warning>
如遇到 URL 配置回调不通过或配置失败:
1. 确保服务器防火墙关闭且安全组放行监听端口(默认 9888
2. 仔细检查 Token、Secret、EncodingAESKey 等参数配置是否一致URL 格式是否正确
3. 认证企业微信需要配置与主体一致的备案域名
</Warning>
## 四、绑定微信客服账号
进入企业微信管理后台 **微信客服** 页面,创建客服账号并与上一步的企微自建应用绑定:
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-step1.jpg" width="600"/>
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-step2.jpg" width="600"/>
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-step3.jpg" width="600"/>
绑定完成后,进入 **微信客服 → 微信客服账号详情**,在「**接入链接**」一栏:
- 点击「**复制链接**」可获得形如 `https://work.weixin.qq.com/kfid/kfcd83e5896b9ba07be` 的访问链接
- 点击「**生成二维码**」可获得对应的二维码
把链接或二维码推给微信客户使用即可:
<img src="https://cdn.link-ai.tech/doc/wechat-customer-service_use.png" width="600"/>
## 五、使用
微信用户通过链接或二维码进入客服对话后,即可与 AI 进行多轮对话,支持文本、图片、语音等消息:
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-chat-demo.jpg" width="900"/>
除此之外,基于微信生态官方能力,还可将微信客服应用在公众号、小程序、视频号及视频号小店等场景,详情可查看企业微信管理后台 [微信客服 → 接入场景](https://work.weixin.qq.com/wework_admin/frame#/app/servicer) 的相关说明:
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-interface-demo.png" width="800"/>
## 常见问题
需要确保已安装以下依赖:
```bash
pip install websocket-client pycryptodome
```

View File

@@ -196,6 +196,7 @@
"channels/wecom-bot",
"channels/qq",
"channels/wecom",
"channels/wechat-kf",
"channels/wechatmp",
"channels/telegram",
"channels/slack"
@@ -401,6 +402,7 @@
"en/channels/wecom-bot",
"en/channels/qq",
"en/channels/wecom",
"en/channels/wechat-kf",
"en/channels/wechatmp",
"en/channels/telegram",
"en/channels/slack"
@@ -606,6 +608,7 @@
"ja/channels/wecom-bot",
"ja/channels/qq",
"ja/channels/wecom",
"ja/channels/wechat-kf",
"ja/channels/wechatmp",
"ja/channels/telegram",
"ja/channels/slack"

View File

@@ -0,0 +1,130 @@
---
title: WeCom Customer Service
description: Integrate CowAgent into WeCom Customer Service (微信客服)
---
By binding a WeCom custom enterprise app to a WeCom Customer Service (微信客服) account, CowAgent can take over inbound inquiries from external WeChat users and serve them through links or QR codes embedded in WeChat Mini Programs, Official Accounts, Video Channels, and Video Channel stores.
<Note>
WeCom Customer Service only supports Docker deployment or server Python deployment. A publicly reachable callback URL is required; local run mode is not supported.
</Note>
## 1. Prerequisites
Required resources:
1. A server with a public IP
2. A registered and verified WeCom account
3. WeCom Customer Service capability enabled
<Note>
It is recommended to create a **dedicated** WeCom custom app for Customer Service rather than reusing the existing `wechatcom_app` one — otherwise the two channels will compete for the same callback URL.
</Note>
## 2. Create a WeCom Custom App
1. In the [WeCom Admin Console](https://work.weixin.qq.com/wework_admin/frame#apps), go to **Application Management → Create Application**:
<img src="https://cdn.link-ai.tech/doc/20260228103156.png" width="480"/>
2. Click **My Enterprise** and find the **Corp ID** at the bottom of the page (it goes into `wechat_kf_corp_id`):
<img src="https://cdn.link-ai.tech/doc/wechatcom-hosting-companyid.png" width="600"/>
3. Open the app you just created and click **"View"** next to Secret. The Secret will be pushed to the admin's phone via the WeCom app, where it can be viewed:
<img src="https://cdn.link-ai.tech/doc/wechatcom-hosting-agent-secret.png" width="600"/>
4. Open the app's **Receive Messages → Set API Reception** page, click **"Random Generate"** to generate the **Token** and **EncodingAESKey**, and save them:
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wechatcom-hosting-token-aeskey.jpg" width="600"/>
<Note>
Saving the API reception configuration will fail at this point because the program has not started yet. Come back to save it after the project is running.
</Note>
## 3. Configuration and Run
Fill in the 4 fields collected from the previous step (Corp ID / Secret / Token / EncodingAESKey):
<Tabs>
<Tab title="Web Console">
Start the Cow project and open the Web Console. Go to the **Channels** menu, click **Connect**, choose **WeCom Customer Service**, fill in Corp ID / Secret / Token / AES Key (port defaults to 9888, configurable), and click Connect.
<img src="https://cdn.link-ai.tech/doc/cow-weixinkefu-web-control.png" width="800"/>
</Tab>
<Tab title="Config File">
Add the following configuration to `config.json` (each parameter maps to a field shown in the screenshots above):
```json
{
"channel_type": "wechat_kf",
"wechat_kf_corp_id": "YOUR_CORP_ID",
"wechat_kf_secret": "YOUR_SECRET",
"wechat_kf_token": "YOUR_TOKEN",
"wechat_kf_aes_key": "YOUR_AES_KEY",
"wechat_kf_port": 9888
}
```
| Parameter | Description |
| --- | --- |
| `wechat_kf_corp_id` | Corp ID |
| `wechat_kf_secret` | Secret of the WeCom custom app bound to Customer Service |
| `wechat_kf_token` | Token from the API reception config |
| `wechat_kf_aes_key` | EncodingAESKey from the API reception config |
| `wechat_kf_port` | Listening port, default 9888 |
</Tab>
</Tabs>
After connecting, start the program (the Web Console method restarts the channel automatically). When the log shows `Listening on http://0.0.0.0:9888/wxkf/`, the program is running successfully. You need to open this port externally (e.g., allow it in the cloud server security group).
Then go back to **Receive Messages → Set API Reception** in the WeCom console and set the callback URL to `http://<your-host>:9888/wxkf/`, then click Save. After saving successfully, you also need to add the server IP to **Enterprise Trusted IPs**, otherwise messages cannot be sent or received:
<img src="https://cdn.link-ai.tech/doc/wechat-com_config.png" width="600"/>
<img src="https://cdn.link-ai.tech/doc/20260228103224.png" width="520"/>
<Warning>
If URL verification fails or the configuration is unsuccessful:
1. Ensure the server firewall is disabled and the security group allows the listening port (default 9888)
2. Carefully check that Token, Secret, EncodingAESKey and other parameters are consistent, and the URL format is correct
3. Verified WeCom accounts must use a filed domain matching the entity
</Warning>
## 4. Bind a WeCom Customer Service Account
In the WeCom Admin Console, go to **WeCom Customer Service**, create a customer service account, and bind it to the custom app you created above:
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-step1.jpg" width="600"/>
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-step2.jpg" width="600"/>
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-step3.jpg" width="600"/>
After binding, go to **WeCom Customer Service → Account Details**, and under **"Access Link"**:
- Click **"Copy Link"** to get an access link like `https://work.weixin.qq.com/kfid/kfcd83e5896b9ba07be`
- Click **"Generate QR Code"** to get the corresponding QR code
Distribute the link or QR code to your WeChat customers:
<img src="https://cdn.link-ai.tech/doc/wechat-customer-service_use.png" width="600"/>
## 5. Usage
After WeChat users enter the customer service conversation via the link or QR code, they can chat with the AI across multiple turns, with support for text, image, and voice messages:
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-chat-demo.jpg" width="900"/>
Beyond that, leveraging the official WeChat ecosystem, WeCom Customer Service can also be embedded into Official Accounts, Mini Programs, Video Channels and more. See the **WeCom Customer Service → Access Scenarios** section in the [WeCom Admin Console](https://work.weixin.qq.com/wework_admin/frame#/app/servicer) for details:
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-interface-demo.png" width="800"/>
## FAQ
Make sure the following dependencies are installed:
```bash
pip install websocket-client pycryptodome
```

View File

@@ -0,0 +1,130 @@
---
title: WeChat カスタマーサービス
description: CowAgent を 微信客服WeCom Customer Serviceに統合する
---
WeCom の自建アプリを「微信客服WeCom Customer Service」アカウントにバインドすることで、CowAgent は外部 WeChat ユーザーからの問い合わせを引き受けることができます。WeChat ミニプログラム、公式アカウント、ビデオチャンネル、ビデオチャンネルストアなどから、リンクや QR コードで WeChat ユーザーに到達できます。
<Note>
WeChat カスタマーサービスは Docker デプロイまたはサーバー Python デプロイのみサポートしており、外部からアクセス可能なコールバック URL が必要です。ローカル実行モードには対応していません。
</Note>
## 1. 前提条件
必要なリソース:
1. パブリック IP を持つサーバー
2. 登録済みかつ認証済みの WeCom アカウント
3. 「微信客服」機能が有効になっていること
<Note>
カスタマーサービス専用に **新たな** 企業微信自建アプリを作成することを推奨します。既存の `wechatcom_app` アプリを流用すると、2 つのチャネルが同じコールバック URL を奪い合うことになります。
</Note>
## 2. 企業微信自建アプリの作成
1. [WeCom 管理コンソール](https://work.weixin.qq.com/wework_admin/frame#apps) で **アプリ管理 → アプリ作成** をクリックします:
<img src="https://cdn.link-ai.tech/doc/20260228103156.png" width="480"/>
2. **自社情報** をクリックし、ページ下部で **企業IDCorp ID** を確認します(`wechat_kf_corp_id` に設定します):
<img src="https://cdn.link-ai.tech/doc/wechatcom-hosting-companyid.png" width="600"/>
3. 上で作成したアプリに入り、Secret の隣の「**表示**」をクリックします。Secret は管理者のスマートフォンの WeCom アプリに送られ、そこで確認できます:
<img src="https://cdn.link-ai.tech/doc/wechatcom-hosting-agent-secret.png" width="600"/>
4. アプリの **メッセージ受信 → API 受信設定** に入り、右側の「**ランダム生成**」をクリックして **Token** と **EncodingAESKey** を生成し、保存します:
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wechatcom-hosting-token-aeskey.jpg" width="600"/>
<Note>
プログラムがまだ起動していないため、この時点では API 受信設定を保存できません。プロジェクトが動作した後に戻って保存してください。
</Note>
## 3. 設定と起動
前のステップで収集した 4 つのフィールドCorp ID / Secret / Token / EncodingAESKeyを入力します
<Tabs>
<Tab title="Web コンソール">
Cow プロジェクトを起動した後、Web コンソールを開きます。**チャネル** メニューを選択し、**接入チャネル** をクリックし、**微信客服** を選択して、Corp ID / Secret / Token / AES Key を入力し(ポートはデフォルト 9888、変更可能、接入をクリックします。
<img src="https://cdn.link-ai.tech/doc/cow-weixinkefu-web-control.png" width="800"/>
</Tab>
<Tab title="設定ファイル">
`config.json` に以下の設定を追加します(各パラメータと WeCom コンソールの対応関係は上のスクリーンショットを参照してください):
```json
{
"channel_type": "wechat_kf",
"wechat_kf_corp_id": "YOUR_CORP_ID",
"wechat_kf_secret": "YOUR_SECRET",
"wechat_kf_token": "YOUR_TOKEN",
"wechat_kf_aes_key": "YOUR_AES_KEY",
"wechat_kf_port": 9888
}
```
| パラメータ | 説明 |
| --- | --- |
| `wechat_kf_corp_id` | 企業 ID |
| `wechat_kf_secret` | カスタマーサービスにバインドした企業微信自建アプリの Secret |
| `wechat_kf_token` | API 受信設定の Token |
| `wechat_kf_aes_key` | API 受信設定の EncodingAESKey |
| `wechat_kf_port` | リスンポート、デフォルトは 9888 |
</Tab>
</Tabs>
接入完了後、プログラムを起動しますWeb コンソール方式ではチャネルが自動的に再起動されます)。ログに `Listening on http://0.0.0.0:9888/wxkf/` と表示されれば、プログラムは正常に動作しています。このポートを外部に公開する必要があります(例:クラウドサーバーのセキュリティグループで許可します)。
続いて、WeCom 管理コンソールの **メッセージ受信 → API 受信設定** に戻り、コールバック URL に `http://<your-host>:9888/wxkf/` を入力して保存します。保存が成功したら、サーバー IP を **企業の信頼済み IP** に追加する必要もあります。追加しないとメッセージの送受信ができません:
<img src="https://cdn.link-ai.tech/doc/wechat-com_config.png" width="600"/>
<img src="https://cdn.link-ai.tech/doc/20260228103224.png" width="520"/>
<Warning>
URL のコールバック検証が通らない、または設定がうまくいかない場合:
1. サーバーのファイアウォールが無効になっており、セキュリティグループでリスンポート(デフォルト 9888が許可されていることを確認してください
2. Token、Secret、EncodingAESKey などのパラメータが一致しているか、URL の形式が正しいか慎重に確認してください
3. 認証済みの WeCom アカウントは、法人に対応する届け出済みドメインを設定する必要があります
</Warning>
## 4. 微信客服アカウントとのバインド
WeCom 管理コンソールの **微信客服** ページに入り、カスタマーサービスアカウントを作成し、上で作成した企業微信自建アプリとバインドします:
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-step1.jpg" width="600"/>
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-step2.jpg" width="600"/>
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-step3.jpg" width="600"/>
バインド完了後、**微信客服 → 微信客服アカウント詳細** に入り、「**接入リンク**」の項目で:
- 「**リンクをコピー**」をクリックすると、`https://work.weixin.qq.com/kfid/kfcd83e5896b9ba07be` のような接入リンクが取得できます
- 「**QR コード生成**」をクリックすると、対応する QR コードが取得できます
リンクまたは QR コードを WeChat 顧客に配布してください:
<img src="https://cdn.link-ai.tech/doc/wechat-customer-service_use.png" width="600"/>
## 5. 使い方
WeChat ユーザーがリンクや QR コードからカスタマーサービス対話に入った後、AI とテキスト・画像・音声などのマルチターン対話が可能です:
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-chat-demo.jpg" width="900"/>
これに加え、WeChat 公式エコシステムの機能に基づき、微信客服を公式アカウント、ミニプログラム、ビデオチャンネルなどの場面でも使用できます。詳細は [WeCom 管理コンソール](https://work.weixin.qq.com/wework_admin/frame#/app/servicer) の **微信客服 → 接入シナリオ** を参照してください:
<img src="https://img-1317903499.cos.ap-guangzhou.myqcloud.com/docs/wxcustomer-hosting-interface-demo.png" width="800"/>
## FAQ
以下の依存パッケージがインストールされていることを確認してください:
```bash
pip install websocket-client pycryptodome
```