feat(channel): add WeCom customer service (wechatcom_kf) channel

Introduce a new channel that integrates with WeCom Customer Service
(微信客服), separate from the existing self-built WeCom app channel.

- Register channel type `wechatcom_kf` in factory, app loader and const
- Add config keys for token / secret / aes_key / port / cursor dir and
  the first-start history-skip switch; also expose corresponding env vars
- Implement channel, message and cursor store under channel/wechatcom_kf/

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
6vision
2026-05-21 19:58:47 +08:00
parent ac9d0f18c5
commit 11d92bb22a
8 changed files with 815 additions and 1 deletions

1
app.py
View File

@@ -231,6 +231,7 @@ def _clear_singleton_cache(channel_name: str):
"wechatmp": "channel.wechatmp.wechatmp_channel.WechatMPChannel",
"wechatmp_service": "channel.wechatmp.wechatmp_channel.WechatMPChannel",
"wechatcom_app": "channel.wechatcom.wechatcomapp_channel.WechatComAppChannel",
const.WECHATCOM_KF: "channel.wechatcom_kf.wechatcom_kf_channel.WechatComKfChannel",
const.FEISHU: "channel.feishu.feishu_channel.FeiShuChanel",
const.DINGTALK: "channel.dingtalk.dingtalk_channel.DingTalkChanel",
const.WECOM_BOT: "channel.wecom_bot.wecom_bot_channel.WecomBotChannel",

View File

@@ -27,6 +27,9 @@ def create_channel(channel_type) -> Channel:
elif channel_type == "wechatcom_app":
from channel.wechatcom.wechatcomapp_channel import WechatComAppChannel
ch = WechatComAppChannel()
elif channel_type == const.WECHATCOM_KF:
from channel.wechatcom_kf.wechatcom_kf_channel import WechatComKfChannel
ch = WechatComKfChannel()
elif channel_type == const.FEISHU:
from channel.feishu.feishu_channel import FeiShuChanel
ch = FeiShuChanel()

View File

@@ -0,0 +1,140 @@
# 微信客服WeCom Customer Service通道
> 与 `channel/wechatcom/`(企微自建应用)是两个**独立的 CoW 通道**
>
> - 自建应用:**面向企业内部成员**(员工通过企业微信 App 与机器人对话)。
> - 微信客服:**面向外部微信用户**(普通微信用户通过链接/二维码进入对话)。
>
> 但底层都基于"企微自建应用"——本通道是**通过把一个企微自建应用绑定到微信客服账号**来实现 AI 接管对外咨询,详见 [LinkAI 微信客服接入文档](https://docs.link-ai.tech/platform/link-app/wechat-customer-service)。
## 一、接入流程概览
```
┌─────────────────────┐ ┌─────────────────────┐ ┌──────────────────┐
│ 1. 企业微信后台 │ → │ 2. CoW 配置回调 │ → │ 3. 绑定微信客服 │
│ 创建一个自建应用 │ │ 端口 9899 │ │ 账号 │
└─────────────────────┘ └─────────────────────┘ └──────────────────┘
外部微信用户通过
链接/二维码 →
消息 → CoW Bot
```
> **重要**:建议**单独再创建一个企微自建应用**用于微信客服,**不要复用**已经接入员工内部使用的那个 `wechatcom_app` 应用,否则两个通道会争抢同一个回调地址。
## 二、企业微信后台配置
### 1. 创建企微自建应用
进入 企业微信管理后台 → **应用管理****创建应用**
### 2. 收集字段
| 字段 | 来源 | 对应 CoW 配置项 |
|---|---|---|
| 企业IDCorpId | 「我的企业」最下方 | `wechatcom_corp_id` |
| Secret | 进入应用详情 → 复制 | `wechatcom_kf_secret` |
| Token | 应用「接收消息 → 设置API接收」 | `wechatcom_kf_token` |
| EncodingAESKey | 应用「接收消息 → 设置API接收」 | `wechatcom_kf_aes_key` |
> AgentId 在本通道**不需要**(消息发送走的是 `cgi-bin/kf/send_msg`,不依赖 agent_id
### 3. 配置回调地址 + 可信 IP
在应用「**接收消息 → 设置API接收**」里填:
- URL`http://<your-host>:9899/wxkf/`(公网必须可达)
- Token / EncodingAESKey与下方 `config.json` 一致
回到应用详情页,把服务器公网 IP 填入「**企业可信IP**」。
### 4. 绑定微信客服账号
进入 企业微信后台 → **微信客服** → 创建客服账号 → **将该账号绑定到上一步创建的企微自建应用**
绑定完成后,企业微信会给你一个客服访问链接 / 二维码,把它推给微信客户使用即可。
## 三、CoW 配置(`config.json`
```json
{
"channel_type": "wechatcom_kf",
"wechatcom_corp_id": "ww1234567890abcdef",
"wechatcom_kf_secret": "<企微应用的 Secret>",
"wechatcom_kf_token": "<接收消息 Token>",
"wechatcom_kf_aes_key": "<EncodingAESKey>",
"wechatcom_kf_port": 9899,
"wechatcom_kf_cursor_dir": "tmp",
"wechatcom_kf_skip_history_on_first_start": true
}
```
| 字段 | 说明 |
|---|---|
| `wechatcom_corp_id` | 企业 ID可与 `wechatcom_app` 共用 |
| `wechatcom_kf_secret` | **绑定到微信客服**的那个企微自建应用的 Secret不是 wechatcomapp_secret |
| `wechatcom_kf_token` | 该应用「接收消息」配置的 Token |
| `wechatcom_kf_aes_key` | 该应用「接收消息」配置的 EncodingAESKey |
| `wechatcom_kf_port` | 监听端口,默认 `9899`(避开 `wechatcomapp_port=9898` |
| `wechatcom_kf_cursor_dir` | `next_cursor` 持久化目录,默认 `tmp/` |
| `wechatcom_kf_skip_history_on_first_start` | 首次启动(无 cursor时跳过历史消息强烈建议 `true`,否则会回放最近 14 天的消息把所有用户都骚扰一遍 |
也支持环境变量:`WECHATCOM_CORP_ID` / `WECHATCOM_KF_SECRET` / `WECHATCOM_KF_TOKEN` / `WECHATCOM_KF_AES_KEY`
## 四、运行
```bash
python app.py
```
启动后日志里会看到:
```
[wechatcom_kf] WeCom customer-service channel started
[wechatcom_kf] Listening on http://0.0.0.0:9899/wxkf/
```
回到企微后台「设置API接收」点击保存——会触发 `GET /wxkf/?...&echostr=...`CoW 通过 `crypto.check_signature` 校验后返回明文 `echostr`,验证成功。
## 五、与自建应用通道的运行差异
| 维度 | 自建应用 (`wechatcom_app`) | 微信客服 (`wechatcom_kf`) |
|---|---|---|
| 接收方式 | 回调直接 push消息内容现成 | 回调只通知"有新消息",需调 `kf/sync_msg` 主动拉 |
| 接收方ID | `userid`(成员) | `external_userid`(外部用户)+ `open_kfid`(客服身份) |
| 发送接口 | `wechatpy` 内置封装 | 直接 POST `cgi-bin/kf/send_msg` |
| 端口 | 9898 | 9899 |
| 状态保存 | 无 | 必须持久化 `next_cursor`(本通道写本地 JSON |
## 六、cursor 持久化
`next_cursor` 是企微返回的"我上次拉到哪儿了"的书签。本通道把它存在
`tmp/wechatcom_kf_cursors.json`(按 `open_kfid` 分键),重启不会丢。
**不要轻易删除该文件**,否则下次启动:
-`wechatcom_kf_skip_history_on_first_start=true`(默认):会触发"跳过历史消息"逻辑,**自动**把 cursor 推进到最新位置;
- 若改为 `false`:会把最近 14 天的全部历史消息当成新消息回放并自动回复。
## 七、多客服账号
一个企业可以创建多个客服账号(多个 `open_kfid`)共用一个企微应用。本通道
**自动按 `open_kfid` 维护各自的 cursor**,无需为每个客服账号单独配置。
## 八、支持的回复类型
| ReplyType | 是否支持 | 备注 |
|---|---|---|
| `TEXT` / `INFO` / `ERROR` | ✅ | 自动按 2048 字节切片分段发送 |
| `IMAGE`(本地) / `IMAGE_URL`(网络) | ✅ | 大图自动压缩到 10MB 以内 |
| `VOICE` | ✅ | 转 amr 后发送,>60s 自动切片 |
| `VIDEO_URL` | ✅ | 通过临时素材接口上传 |
| `FILE` | ✅ | |
## 九、参考文档
- [LinkAI 微信客服接入文档](https://docs.link-ai.tech/platform/link-app/wechat-customer-service)
- [企业微信开放接口 - 微信客服 - 接收消息](https://developer.work.weixin.qq.com/document/path/94670)
- [企业微信开放接口 - 微信客服 - 发送消息](https://developer.work.weixin.qq.com/document/path/95122)

View File

@@ -0,0 +1,501 @@
# -*- coding=utf-8 -*-
"""
WeCom Customer Service (微信客服) channel for CoW.
Differences from `channel/wechatcom/` (企微自建应用):
1. Audience: external WeChat users (not internal members).
2. Receiver fields: `external_userid` + `open_kfid` instead of a single
member `userid`.
3. Inbound flow: callback only delivers an event token, the actual
message bodies must be pulled via `cgi-bin/kf/sync_msg` with a
persistent cursor. See `wechatcom_kf_cursor_store.py`.
4. Outbound flow: messages are sent via `cgi-bin/kf/send_msg` (each
request must specify both `touser` and `open_kfid`); wechatpy has
no native helper, so we call the HTTP endpoint directly.
"""
import io
import json
import os
import time
import xml.etree.ElementTree as ET
from typing import Optional
import requests
import web
from wechatpy.enterprise import parse_message
from wechatpy.enterprise.crypto import WeChatCrypto
from wechatpy.enterprise.exceptions import InvalidCorpIdException
from wechatpy.exceptions import InvalidSignatureException, WeChatClientException
from bridge.context import Context
from bridge.reply import Reply, ReplyType
from channel.chat_channel import ChatChannel
from channel.wechatcom.wechatcomapp_client import WechatComAppClient
from channel.wechatcom_kf.wechatcom_kf_cursor_store import CursorStore
from channel.wechatcom_kf.wechatcom_kf_message import WechatComKfMessage
from common.log import logger
from common.singleton import singleton
from common.utils import (
compress_imgfile,
fsize,
remove_markdown_symbol,
split_string_by_utf8_length,
)
from config import conf
try:
from voice.audio_convert import any_to_amr, split_audio
except ImportError as e: # voice features optional
logger.debug(
"[wechatcom_kf] import voice.audio_convert failed, voice will be disabled: {}".format(e)
)
MAX_UTF8_LEN = 2048
KF_API_BASE = "https://qyapi.weixin.qq.com/cgi-bin/kf"
SYNC_MSG_LIMIT = 1000
@singleton
class WechatComKfChannel(ChatChannel):
NOT_SUPPORT_REPLYTYPE = []
def __init__(self):
super().__init__()
self.corp_id = conf().get("wechatcom_corp_id")
self.secret = conf().get("wechatcom_kf_secret")
self.token = conf().get("wechatcom_kf_token")
self.aes_key = conf().get("wechatcom_kf_aes_key")
self._http_server = None
logger.info(
"[wechatcom_kf] Initializing WeCom customer-service channel, corp_id: {}".format(
self.corp_id
)
)
self.crypto = WeChatCrypto(self.token, self.aes_key, self.corp_id)
self.client = WechatComAppClient(self.corp_id, self.secret)
cursor_dir = conf().get("wechatcom_kf_cursor_dir", "tmp")
cursor_path = os.path.join(cursor_dir, "wechatcom_kf_cursors.json")
self.cursor_store = CursorStore(cursor_path)
self.skip_history_on_first_start = conf().get(
"wechatcom_kf_skip_history_on_first_start", True
)
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def startup(self):
urls = ("/wxkf/?", "channel.wechatcom_kf.wechatcom_kf_channel.Query")
app = web.application(urls, globals(), autoreload=False)
port = conf().get("wechatcom_kf_port", 9899)
logger.info("[wechatcom_kf] WeCom customer-service channel started")
logger.info("[wechatcom_kf] Listening on http://0.0.0.0:{}/wxkf/".format(port))
func = web.httpserver.StaticMiddleware(app.wsgifunc())
func = web.httpserver.LogMiddleware(func)
server = web.httpserver.WSGIServer(("0.0.0.0", port), func)
self._http_server = server
try:
server.start()
except (KeyboardInterrupt, SystemExit):
server.stop()
def stop(self):
if self._http_server:
try:
self._http_server.stop()
logger.info("[wechatcom_kf] HTTP server stopped")
except Exception as e:
logger.warning(f"[wechatcom_kf] Error stopping HTTP server: {e}")
self._http_server = None
# ------------------------------------------------------------------
# Outbound — implementing the abstract `send` contract
# ------------------------------------------------------------------
def send(self, reply: Reply, context: Context):
receiver = context["receiver"]
msg = context.kwargs.get("msg")
external_userid = context.get("external_userid") or (msg.external_userid if msg else None)
open_kfid = context.get("open_kfid") or (msg.open_kfid if msg else None)
if not external_userid or not open_kfid:
logger.error(
"[wechatcom_kf] missing external_userid or open_kfid, cannot send: "
f"external_userid={external_userid}, open_kfid={open_kfid}"
)
return
if reply.type in [ReplyType.TEXT, ReplyType.ERROR, ReplyType.INFO]:
reply_text = remove_markdown_symbol(reply.content)
texts = split_string_by_utf8_length(reply_text, MAX_UTF8_LEN)
if len(texts) > 1:
logger.info(
"[wechatcom_kf] text too long, split into {} parts".format(len(texts))
)
for i, text in enumerate(texts):
self._send_text(external_userid, open_kfid, text)
if i != len(texts) - 1:
time.sleep(0.5)
logger.info("[wechatcom_kf] Do send text to {}: {}".format(receiver, reply_text))
elif reply.type == ReplyType.VOICE:
file_path = reply.content
try:
amr_file = os.path.splitext(file_path)[0] + ".amr"
any_to_amr(file_path, amr_file)
duration, files = split_audio(amr_file, 60 * 1000)
if len(files) > 1:
logger.info(
"[wechatcom_kf] voice too long {}s > 60s, split into {} parts".format(
duration / 1000.0, len(files)
)
)
media_ids = []
for path in files:
with open(path, "rb") as f:
response = self.client.media.upload("voice", f)
logger.debug("[wechatcom_kf] upload voice response: {}".format(response))
media_ids.append(response["media_id"])
except ImportError as e:
logger.error("[wechatcom_kf] voice conversion failed: {}".format(e))
logger.error("[wechatcom_kf] please install pydub: pip install pydub")
return
except WeChatClientException as e:
logger.error("[wechatcom_kf] upload voice failed: {}".format(e))
return
try:
os.remove(file_path)
if amr_file != file_path:
os.remove(amr_file)
except Exception:
pass
for media_id in media_ids:
self._send_voice(external_userid, open_kfid, media_id)
time.sleep(1)
logger.info("[wechatcom_kf] sendVoice={}, receiver={}".format(reply.content, receiver))
elif reply.type == ReplyType.IMAGE_URL:
img_url = reply.content
pic_res = requests.get(img_url, stream=True)
image_storage = io.BytesIO()
for block in pic_res.iter_content(1024):
image_storage.write(block)
sz = fsize(image_storage)
if sz >= 10 * 1024 * 1024:
logger.info("[wechatcom_kf] image too large, compressing, sz={}".format(sz))
image_storage = compress_imgfile(image_storage, 10 * 1024 * 1024 - 1)
image_storage.seek(0)
try:
response = self.client.media.upload("image", image_storage)
except WeChatClientException as e:
logger.error("[wechatcom_kf] upload image failed: {}".format(e))
return
self._send_image(external_userid, open_kfid, response["media_id"])
logger.info("[wechatcom_kf] sendImage url={}, receiver={}".format(img_url, receiver))
elif reply.type == ReplyType.IMAGE:
image_storage = reply.content
sz = fsize(image_storage)
if sz >= 10 * 1024 * 1024:
logger.info("[wechatcom_kf] image too large, compressing, sz={}".format(sz))
image_storage = compress_imgfile(image_storage, 10 * 1024 * 1024 - 1)
image_storage.seek(0)
try:
response = self.client.media.upload("image", image_storage)
except WeChatClientException as e:
logger.error("[wechatcom_kf] upload image failed: {}".format(e))
return
self._send_image(external_userid, open_kfid, response["media_id"])
logger.info("[wechatcom_kf] sendImage, receiver={}".format(receiver))
elif reply.type == ReplyType.VIDEO_URL:
video_url = reply.content
try:
response = self.client.media.upload(
"video", requests.get(video_url, stream=True).content
)
except WeChatClientException as e:
logger.error("[wechatcom_kf] upload video failed: {}".format(e))
return
self._send_video(external_userid, open_kfid, response["media_id"])
logger.info("[wechatcom_kf] sendVideo url={}, receiver={}".format(video_url, receiver))
elif reply.type == ReplyType.FILE:
file_path = reply.content
try:
with open(file_path, "rb") as f:
response = self.client.media.upload(
"file", (os.path.basename(file_path), f.read())
)
except WeChatClientException as e:
logger.error("[wechatcom_kf] upload file failed: {}".format(e))
return
self._send_file(external_userid, open_kfid, response["media_id"])
logger.info("[wechatcom_kf] sendFile={}, receiver={}".format(file_path, receiver))
else:
logger.warning("[wechatcom_kf] unsupported reply type: {}".format(reply.type))
# ------------------------------------------------------------------
# Inbound — pull messages by cursor
# ------------------------------------------------------------------
def consume_callback(self, token: str, open_kfid: str):
"""
Called from the HTTP `Query.POST` handler whenever WeCom notifies
us that there are new messages for `open_kfid`. Pulls all new
messages via sync_msg and feeds them into `produce()`.
"""
existing_cursor = self.cursor_store.get(open_kfid)
# First-time bootstrap: avoid replaying up to 14 days of history.
if not existing_cursor and self.skip_history_on_first_start:
self._initialize_cursor(token, open_kfid)
return
msgs = self._pull_messages(token, open_kfid, existing_cursor)
if not msgs:
return
for raw in msgs:
try:
kf_msg = WechatComKfMessage(msg=raw, client=self.client)
except NotImplementedError as e:
logger.debug("[wechatcom_kf] {}".format(e))
continue
context = self._compose_context(
kf_msg.ctype,
kf_msg.content,
isgroup=False,
msg=kf_msg,
)
if context:
self.produce(context)
time.sleep(0.05) # tiny gap between messages of the same batch
def _initialize_cursor(self, token: str, open_kfid: str):
"""
Drain all current messages for this `open_kfid` without producing
any context, just to advance the cursor to "now". This prevents
a fresh deployment from replying to up to ~14 days of history.
"""
next_cursor = ""
total_skipped = 0
while True:
data = self._call_sync_msg(token, open_kfid, next_cursor)
if data is None:
break
msg_list = data.get("msg_list") or []
total_skipped += len(msg_list)
cursor_after = data.get("next_cursor") or ""
if cursor_after:
self.cursor_store.set(open_kfid, cursor_after)
if not data.get("has_more"):
break
if not cursor_after or cursor_after == next_cursor:
break
next_cursor = cursor_after
logger.info(
"[wechatcom_kf] first-start bootstrap finished for open_kfid={}, "
"skipped {} historical messages".format(open_kfid, total_skipped)
)
def _pull_messages(self, token: str, open_kfid: str, next_cursor: Optional[str]) -> list:
"""Loop sync_msg until `has_more` is false. Returns raw msg dicts."""
collected = []
cursor = next_cursor or ""
while True:
data = self._call_sync_msg(token, open_kfid, cursor)
if data is None:
break
for item in data.get("msg_list") or []:
# Only consume messages from external users; ignore replies
# generated by our own kf account, otherwise we would loop
# back into ourselves.
if not item.get("external_userid"):
continue
if item.get("msgtype") in ("text", "image", "voice"):
collected.append(item)
cursor_after = data.get("next_cursor") or ""
if cursor_after:
self.cursor_store.set(open_kfid, cursor_after)
if not data.get("has_more"):
break
if not cursor_after or cursor_after == cursor:
break
cursor = cursor_after
if collected:
collected = _dedup_image_text_pair(collected)
logger.info(
"[wechatcom_kf] pulled {} messages for open_kfid={}".format(len(collected), open_kfid)
)
return collected
def _call_sync_msg(self, token: str, open_kfid: str, cursor: str) -> Optional[dict]:
url = f"{KF_API_BASE}/sync_msg?access_token={self.client.fetch_access_token()}"
payload = {
"token": token,
"open_kfid": open_kfid,
"limit": SYNC_MSG_LIMIT,
}
if cursor:
payload["cursor"] = cursor
try:
resp = requests.post(url, json=payload, timeout=10).json()
except Exception as e:
logger.error(f"[wechatcom_kf] sync_msg request failed: {e}")
return None
if resp.get("errcode") != 0:
logger.error(
f"[wechatcom_kf] sync_msg errcode={resp.get('errcode')}, "
f"errmsg={resp.get('errmsg')}, open_kfid={open_kfid}"
)
return None
return resp
# ------------------------------------------------------------------
# Outbound HTTP wrappers (kf/send_msg)
# ------------------------------------------------------------------
def _post_send_msg(self, payload: dict) -> dict:
url = f"{KF_API_BASE}/send_msg?access_token={self.client.fetch_access_token()}"
try:
resp = requests.post(url, json=payload, timeout=10).json()
except Exception as e:
logger.error(f"[wechatcom_kf] send_msg request failed: {e}")
return {"errcode": -1, "errmsg": str(e)}
if resp.get("errcode") != 0:
logger.error(f"[wechatcom_kf] send_msg failed, payload={payload}, resp={resp}")
return resp
def _send_text(self, external_userid: str, open_kfid: str, content: str) -> dict:
return self._post_send_msg({
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "text",
"text": {"content": content},
})
def _send_image(self, external_userid: str, open_kfid: str, media_id: str) -> dict:
return self._post_send_msg({
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "image",
"image": {"media_id": media_id},
})
def _send_voice(self, external_userid: str, open_kfid: str, media_id: str) -> dict:
return self._post_send_msg({
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "voice",
"voice": {"media_id": media_id},
})
def _send_video(self, external_userid: str, open_kfid: str, media_id: str) -> dict:
return self._post_send_msg({
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "video",
"video": {"media_id": media_id},
})
def _send_file(self, external_userid: str, open_kfid: str, media_id: str) -> dict:
return self._post_send_msg({
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "file",
"file": {"media_id": media_id},
})
def _send_link(self, external_userid: str, open_kfid: str, link_data: dict) -> dict:
return self._post_send_msg({
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "link",
"link": link_data,
})
def _dedup_image_text_pair(messages: list) -> list:
"""
A WeChat user often sends an image immediately followed by a text
question (e.g. "what's in this picture?"). When that happens we
treat them as one combined turn: image first, then text.
Otherwise (single message or unrelated batch) we return only the
last message to avoid replying to stale history.
"""
if not messages:
return []
text_messages = [m for m in messages if m["msgtype"] == "text"]
image_messages = [m for m in messages if m["msgtype"] == "image"]
if text_messages and image_messages:
last_text_time = text_messages[-1]["send_time"]
first_image_time = image_messages[0]["send_time"]
if abs(last_text_time - first_image_time) <= 5:
return [image_messages[0], text_messages[-1]]
return [messages[-1]]
# ----------------------------------------------------------------------
# HTTP handlers (web.py)
# ----------------------------------------------------------------------
class Query:
def GET(self):
channel = WechatComKfChannel()
params = web.input()
logger.info("[wechatcom_kf] verify params: {}".format(params))
try:
signature = params.msg_signature
timestamp = params.timestamp
nonce = params.nonce
echostr = params.echostr
echostr = channel.crypto.check_signature(signature, timestamp, nonce, echostr)
except (InvalidSignatureException, InvalidCorpIdException):
raise web.Forbidden()
return echostr
def POST(self):
channel = WechatComKfChannel()
params = web.input()
try:
signature = params.msg_signature
timestamp = params.timestamp
nonce = params.nonce
raw_body = web.data()
decrypted = channel.crypto.decrypt_message(raw_body, signature, timestamp, nonce)
except (InvalidSignatureException, InvalidCorpIdException) as e:
logger.warning(f"[wechatcom_kf] invalid signature: {e}")
raise web.Forbidden()
# We need the Token + OpenKfId fields from the inner XML to call
# sync_msg. wechatpy's parsed object exposes neither, so we parse
# the raw XML directly.
try:
root = ET.fromstring(decrypted)
except ET.ParseError as e:
logger.error(f"[wechatcom_kf] xml parse error: {e}")
return "success"
msg_type = (root.findtext("MsgType") or "").strip()
event = (root.findtext("Event") or "").strip()
if msg_type != "event" or event != "kf_msg_or_event":
logger.debug(
f"[wechatcom_kf] ignored callback msg_type={msg_type}, event={event}"
)
return "success"
token = root.findtext("Token") or ""
open_kfid = root.findtext("OpenKfId") or ""
if not token or not open_kfid:
logger.warning(
f"[wechatcom_kf] callback missing token or open_kfid: {decrypted}"
)
return "success"
try:
channel.consume_callback(token, open_kfid)
except Exception as e:
logger.exception(f"[wechatcom_kf] consume_callback error: {e}")
return "success"

View File

@@ -0,0 +1,74 @@
# -*- coding=utf-8 -*-
"""
Local-file based persistence for WeCom customer-service `next_cursor`.
Why we need this:
The WeCom customer-service (微信客服) callback only notifies us that
"new messages exist". To actually fetch them we must call the
`cgi-bin/kf/sync_msg` endpoint with a `cursor` so that we only get
messages newer than the previously processed one. If we lose this
cursor (e.g. on process restart) WeCom will replay up to ~14 days of
history, which would cause the bot to flood users with duplicate
replies.
This implementation deliberately avoids any external dependency
(no Redis / no DB) — a single JSON file under the project's tmp dir is
enough for a CoW-style single-process deployment.
"""
import json
import os
import threading
from typing import Optional
from common.log import logger
class CursorStore:
"""Thread-safe per-`open_kfid` cursor store backed by a JSON file."""
def __init__(self, file_path: str):
self._file_path = file_path
self._lock = threading.Lock()
self._data = self._load()
def _load(self) -> dict:
try:
if os.path.exists(self._file_path):
with open(self._file_path, "r", encoding="utf-8") as f:
return json.load(f) or {}
except Exception as e:
logger.warning(f"[wechatcom_kf] failed to load cursor file {self._file_path}: {e}")
return {}
def _flush_locked(self):
# Atomic write: write to *.tmp first then rename, avoid corruption on crash.
tmp_path = self._file_path + ".tmp"
try:
os.makedirs(os.path.dirname(self._file_path) or ".", exist_ok=True)
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(self._data, f, ensure_ascii=False)
os.replace(tmp_path, self._file_path)
except Exception as e:
logger.warning(f"[wechatcom_kf] failed to flush cursor file {self._file_path}: {e}")
try:
if os.path.exists(tmp_path):
os.remove(tmp_path)
except Exception:
pass
def get(self, open_kfid: str) -> Optional[str]:
with self._lock:
return self._data.get(open_kfid)
def set(self, open_kfid: str, cursor: str):
if not cursor:
return
with self._lock:
if self._data.get(open_kfid) == cursor:
return
self._data[open_kfid] = cursor
self._flush_locked()
def has(self, open_kfid: str) -> bool:
with self._lock:
return open_kfid in self._data

View File

@@ -0,0 +1,82 @@
# -*- coding=utf-8 -*-
"""
Adapter that turns a single `sync_msg` item from WeCom customer-service
into a CoW `ChatMessage` object.
"""
from wechatpy.enterprise import WeChatClient
from bridge.context import ContextType
from channel.chat_message import ChatMessage
from common.log import logger
from common.tmp_dir import TmpDir
class WechatComKfMessage(ChatMessage):
"""
msg structure (from cgi-bin/kf/sync_msg):
{
"msgid": "...",
"send_time": 1700000000,
"origin": 3,
"msgtype": "text" | "image" | "voice" | ...,
"open_kfid": "wkxxxx",
"external_userid": "wmxxxx",
"text": {"content": "..."},
"image": {"media_id": "..."},
"voice": {"media_id": "..."},
...
}
"""
def __init__(self, msg: dict, client: WeChatClient = None, is_group: bool = False):
# NOTE: skip parent constructor because it expects a wechatpy parsed
# message object, while here we receive a raw dict from sync_msg.
super().__init__(msg)
self.is_group = is_group
self.msg_id = msg.get("msgid")
self.create_time = msg.get("send_time")
self.origin = msg.get("origin")
self.msgtype = msg.get("msgtype")
self.open_kfid = msg.get("open_kfid")
self.external_userid = msg.get("external_userid")
if self.msgtype == "text":
self.ctype = ContextType.TEXT
self.content = msg.get("text", {}).get("content", "")
elif self.msgtype == "image":
self.ctype = ContextType.IMAGE
media_id = msg.get("image", {}).get("media_id", "")
self.content = TmpDir().path() + media_id + ".jpg"
def download_image():
response = client.media.download(media_id)
if response.status_code == 200:
with open(self.content, "wb") as f:
f.write(response.content)
else:
logger.info(f"[wechatcom_kf] Failed to download image, {response.content}")
self._prepare_fn = download_image
elif self.msgtype == "voice":
self.ctype = ContextType.VOICE
media_id = msg.get("voice", {}).get("media_id", "")
# WeCom returns amr by default; downstream voice pipeline will convert.
self.content = TmpDir().path() + media_id + ".amr"
def download_voice():
response = client.media.download(media_id)
if response.status_code == 200:
with open(self.content, "wb") as f:
f.write(response.content)
else:
logger.info(f"[wechatcom_kf] Failed to download voice, {response.content}")
self._prepare_fn = download_voice
else:
raise NotImplementedError(
f"[wechatcom_kf] Unsupported message type: {self.msgtype}"
)
self.from_user_id = self.external_userid
self.to_user_id = self.open_kfid
self.other_user_id = self.external_userid

View File

@@ -227,3 +227,4 @@ DINGTALK = "dingtalk"
WECOM_BOT = "wecom_bot"
QQ = "qq"
WEIXIN = "weixin"
WECHATCOM_KF = "wechatcom_kf" # WeCom customer service (微信客服) channel

View File

@@ -151,6 +151,14 @@ available_setting = {
"wechatcomapp_secret": "", # 企业微信app的secret
"wechatcomapp_agent_id": "", # 企业微信app的agent_id
"wechatcomapp_aes_key": "", # 企业微信app的aes_key
# 微信客服(wechatcom_kf)的配置
# 注意: 微信客服与企微自建应用是两套不同的应用,共用 corp_id,但 secret/token/aes_key 各自独立
"wechatcom_kf_token": "", # 微信客服回调token
"wechatcom_kf_port": 9899, # 微信客服回调服务端口
"wechatcom_kf_secret": "", # 微信客服应用的secret
"wechatcom_kf_aes_key": "", # 微信客服回调aes_key
"wechatcom_kf_cursor_dir": "tmp", # 拉取消息的cursor持久化目录(相对项目根)
"wechatcom_kf_skip_history_on_first_start": True, # 首次启动(无cursor)时跳过历史消息,只拉最新
# 飞书配置
"feishu_port": 80, # 飞书bot监听端口仅webhook模式需要
"feishu_app_id": "", # 飞书机器人应用APP Id
@@ -174,7 +182,7 @@ available_setting = {
# chatgpt指令自定义触发词
"clear_memory_commands": ["#清除记忆"], # 重置会话指令,必须以#开头
# channel配置
"channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wecom_bot,weixin,wechatmp,wechatmp_service,wechatcom_app
"channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wecom_bot,weixin,wechatmp,wechatmp_service,wechatcom_app,wechatcom_kf
"web_console": True, # 是否自动启动Web控制台默认启动。设为False可禁用
"subscribe_msg": "", # 订阅消息, 支持: wechatmp, wechatmp_service, wechatcom_app
"debug": False, # 是否开启debug模式开启后会打印更多日志
@@ -410,6 +418,10 @@ def load_config():
"wechatmp_app_secret": "WECHATMP_APP_SECRET",
"wechatcomapp_agent_id": "WECHATCOMAPP_AGENT_ID",
"wechatcomapp_secret": "WECHATCOMAPP_SECRET",
"wechatcom_corp_id": "WECHATCOM_CORP_ID",
"wechatcom_kf_secret": "WECHATCOM_KF_SECRET",
"wechatcom_kf_token": "WECHATCOM_KF_TOKEN",
"wechatcom_kf_aes_key": "WECHATCOM_KF_AES_KEY",
"qq_app_id": "QQ_APP_ID",
"qq_app_secret": "QQ_APP_SECRET",
"weixin_token": "WEIXIN_TOKEN",