diff --git a/app.py b/app.py index be4ad2eb..8dc1e970 100644 --- a/app.py +++ b/app.py @@ -231,6 +231,7 @@ def _clear_singleton_cache(channel_name: str): "wechatmp": "channel.wechatmp.wechatmp_channel.WechatMPChannel", "wechatmp_service": "channel.wechatmp.wechatmp_channel.WechatMPChannel", "wechatcom_app": "channel.wechatcom.wechatcomapp_channel.WechatComAppChannel", + const.WECHATCOM_KF: "channel.wechatcom_kf.wechatcom_kf_channel.WechatComKfChannel", const.FEISHU: "channel.feishu.feishu_channel.FeiShuChanel", const.DINGTALK: "channel.dingtalk.dingtalk_channel.DingTalkChanel", const.WECOM_BOT: "channel.wecom_bot.wecom_bot_channel.WecomBotChannel", diff --git a/channel/channel_factory.py b/channel/channel_factory.py index cf6bfea8..9f35e3fc 100644 --- a/channel/channel_factory.py +++ b/channel/channel_factory.py @@ -27,6 +27,9 @@ def create_channel(channel_type) -> Channel: elif channel_type == "wechatcom_app": from channel.wechatcom.wechatcomapp_channel import WechatComAppChannel ch = WechatComAppChannel() + elif channel_type == const.WECHATCOM_KF: + from channel.wechatcom_kf.wechatcom_kf_channel import WechatComKfChannel + ch = WechatComKfChannel() elif channel_type == const.FEISHU: from channel.feishu.feishu_channel import FeiShuChanel ch = FeiShuChanel() diff --git a/channel/wechatcom_kf/README.md b/channel/wechatcom_kf/README.md new file mode 100644 index 00000000..88edf29e --- /dev/null +++ b/channel/wechatcom_kf/README.md @@ -0,0 +1,140 @@ +# 微信客服(WeCom Customer Service)通道 + +> 与 `channel/wechatcom/`(企微自建应用)是两个**独立的 CoW 通道**: +> +> - 自建应用:**面向企业内部成员**(员工通过企业微信 App 与机器人对话)。 +> - 微信客服:**面向外部微信用户**(普通微信用户通过链接/二维码进入对话)。 +> +> 但底层都基于"企微自建应用"——本通道是**通过把一个企微自建应用绑定到微信客服账号**来实现 AI 接管对外咨询,详见 [LinkAI 微信客服接入文档](https://docs.link-ai.tech/platform/link-app/wechat-customer-service)。 + +## 一、接入流程概览 + +``` +┌─────────────────────┐ ┌─────────────────────┐ ┌──────────────────┐ +│ 1. 企业微信后台 │ → │ 2. CoW 配置回调 │ → │ 3. 绑定微信客服 │ +│ 创建一个自建应用 │ │ 端口 9899 │ │ 账号 │ +└─────────────────────┘ └─────────────────────┘ └──────────────────┘ + ↓ + 外部微信用户通过 + 链接/二维码 → + 消息 → CoW Bot +``` + +> **重要**:建议**单独再创建一个企微自建应用**用于微信客服,**不要复用**已经接入员工内部使用的那个 `wechatcom_app` 应用,否则两个通道会争抢同一个回调地址。 + +## 二、企业微信后台配置 + +### 1. 创建企微自建应用 + +进入 企业微信管理后台 → **应用管理** → **创建应用**。 + +### 2. 收集字段 + +| 字段 | 来源 | 对应 CoW 配置项 | +|---|---|---| +| 企业ID(CorpId) | 「我的企业」最下方 | `wechatcom_corp_id` | +| Secret | 进入应用详情 → 复制 | `wechatcom_kf_secret` | +| Token | 应用「接收消息 → 设置API接收」 | `wechatcom_kf_token` | +| EncodingAESKey | 应用「接收消息 → 设置API接收」 | `wechatcom_kf_aes_key` | + +> AgentId 在本通道**不需要**(消息发送走的是 `cgi-bin/kf/send_msg`,不依赖 agent_id)。 + +### 3. 配置回调地址 + 可信 IP + +在应用「**接收消息 → 设置API接收**」里填: + +- URL:`http://:9899/wxkf/`(公网必须可达) +- Token / EncodingAESKey:与下方 `config.json` 一致 + +回到应用详情页,把服务器公网 IP 填入「**企业可信IP**」。 + +### 4. 绑定微信客服账号 + +进入 企业微信后台 → **微信客服** → 创建客服账号 → **将该账号绑定到上一步创建的企微自建应用**。 + +绑定完成后,企业微信会给你一个客服访问链接 / 二维码,把它推给微信客户使用即可。 + +## 三、CoW 配置(`config.json`) + +```json +{ + "channel_type": "wechatcom_kf", + + "wechatcom_corp_id": "ww1234567890abcdef", + "wechatcom_kf_secret": "<企微应用的 Secret>", + "wechatcom_kf_token": "<接收消息 Token>", + "wechatcom_kf_aes_key": "", + "wechatcom_kf_port": 9899, + + "wechatcom_kf_cursor_dir": "tmp", + "wechatcom_kf_skip_history_on_first_start": true +} +``` + +| 字段 | 说明 | +|---|---| +| `wechatcom_corp_id` | 企业 ID,可与 `wechatcom_app` 共用 | +| `wechatcom_kf_secret` | **绑定到微信客服**的那个企微自建应用的 Secret(不是 wechatcomapp_secret) | +| `wechatcom_kf_token` | 该应用「接收消息」配置的 Token | +| `wechatcom_kf_aes_key` | 该应用「接收消息」配置的 EncodingAESKey | +| `wechatcom_kf_port` | 监听端口,默认 `9899`(避开 `wechatcomapp_port=9898`) | +| `wechatcom_kf_cursor_dir` | `next_cursor` 持久化目录,默认 `tmp/` | +| `wechatcom_kf_skip_history_on_first_start` | 首次启动(无 cursor)时跳过历史消息,强烈建议 `true`,否则会回放最近 14 天的消息把所有用户都骚扰一遍 | + +也支持环境变量:`WECHATCOM_CORP_ID` / `WECHATCOM_KF_SECRET` / `WECHATCOM_KF_TOKEN` / `WECHATCOM_KF_AES_KEY`。 + +## 四、运行 + +```bash +python app.py +``` + +启动后日志里会看到: + +``` +[wechatcom_kf] WeCom customer-service channel started +[wechatcom_kf] Listening on http://0.0.0.0:9899/wxkf/ +``` + +回到企微后台「设置API接收」点击保存——会触发 `GET /wxkf/?...&echostr=...`,CoW 通过 `crypto.check_signature` 校验后返回明文 `echostr`,验证成功。 + +## 五、与自建应用通道的运行差异 + +| 维度 | 自建应用 (`wechatcom_app`) | 微信客服 (`wechatcom_kf`) | +|---|---|---| +| 接收方式 | 回调直接 push,消息内容现成 | 回调只通知"有新消息",需调 `kf/sync_msg` 主动拉 | +| 接收方ID | `userid`(成员) | `external_userid`(外部用户)+ `open_kfid`(客服身份) | +| 发送接口 | `wechatpy` 内置封装 | 直接 POST `cgi-bin/kf/send_msg` | +| 端口 | 9898 | 9899 | +| 状态保存 | 无 | 必须持久化 `next_cursor`(本通道写本地 JSON) | + +## 六、cursor 持久化 + +`next_cursor` 是企微返回的"我上次拉到哪儿了"的书签。本通道把它存在 +`tmp/wechatcom_kf_cursors.json`(按 `open_kfid` 分键),重启不会丢。 + +**不要轻易删除该文件**,否则下次启动: + +- 若 `wechatcom_kf_skip_history_on_first_start=true`(默认):会触发"跳过历史消息"逻辑,**自动**把 cursor 推进到最新位置; +- 若改为 `false`:会把最近 14 天的全部历史消息当成新消息回放并自动回复。 + +## 七、多客服账号 + +一个企业可以创建多个客服账号(多个 `open_kfid`)共用一个企微应用。本通道 +**自动按 `open_kfid` 维护各自的 cursor**,无需为每个客服账号单独配置。 + +## 八、支持的回复类型 + +| ReplyType | 是否支持 | 备注 | +|---|---|---| +| `TEXT` / `INFO` / `ERROR` | ✅ | 自动按 2048 字节切片分段发送 | +| `IMAGE`(本地) / `IMAGE_URL`(网络) | ✅ | 大图自动压缩到 10MB 以内 | +| `VOICE` | ✅ | 转 amr 后发送,>60s 自动切片 | +| `VIDEO_URL` | ✅ | 通过临时素材接口上传 | +| `FILE` | ✅ | | + +## 九、参考文档 + +- [LinkAI 微信客服接入文档](https://docs.link-ai.tech/platform/link-app/wechat-customer-service) +- [企业微信开放接口 - 微信客服 - 接收消息](https://developer.work.weixin.qq.com/document/path/94670) +- [企业微信开放接口 - 微信客服 - 发送消息](https://developer.work.weixin.qq.com/document/path/95122) diff --git a/channel/wechatcom_kf/wechatcom_kf_channel.py b/channel/wechatcom_kf/wechatcom_kf_channel.py new file mode 100644 index 00000000..d8c3cff3 --- /dev/null +++ b/channel/wechatcom_kf/wechatcom_kf_channel.py @@ -0,0 +1,501 @@ +# -*- coding=utf-8 -*- +""" +WeCom Customer Service (微信客服) channel for CoW. + +Differences from `channel/wechatcom/` (企微自建应用): + 1. Audience: external WeChat users (not internal members). + 2. Receiver fields: `external_userid` + `open_kfid` instead of a single + member `userid`. + 3. Inbound flow: callback only delivers an event token, the actual + message bodies must be pulled via `cgi-bin/kf/sync_msg` with a + persistent cursor. See `wechatcom_kf_cursor_store.py`. + 4. Outbound flow: messages are sent via `cgi-bin/kf/send_msg` (each + request must specify both `touser` and `open_kfid`); wechatpy has + no native helper, so we call the HTTP endpoint directly. +""" +import io +import json +import os +import time +import xml.etree.ElementTree as ET +from typing import Optional + +import requests +import web +from wechatpy.enterprise import parse_message +from wechatpy.enterprise.crypto import WeChatCrypto +from wechatpy.enterprise.exceptions import InvalidCorpIdException +from wechatpy.exceptions import InvalidSignatureException, WeChatClientException + +from bridge.context import Context +from bridge.reply import Reply, ReplyType +from channel.chat_channel import ChatChannel +from channel.wechatcom.wechatcomapp_client import WechatComAppClient +from channel.wechatcom_kf.wechatcom_kf_cursor_store import CursorStore +from channel.wechatcom_kf.wechatcom_kf_message import WechatComKfMessage +from common.log import logger +from common.singleton import singleton +from common.utils import ( + compress_imgfile, + fsize, + remove_markdown_symbol, + split_string_by_utf8_length, +) +from config import conf + +try: + from voice.audio_convert import any_to_amr, split_audio +except ImportError as e: # voice features optional + logger.debug( + "[wechatcom_kf] import voice.audio_convert failed, voice will be disabled: {}".format(e) + ) + +MAX_UTF8_LEN = 2048 +KF_API_BASE = "https://qyapi.weixin.qq.com/cgi-bin/kf" +SYNC_MSG_LIMIT = 1000 + + +@singleton +class WechatComKfChannel(ChatChannel): + NOT_SUPPORT_REPLYTYPE = [] + + def __init__(self): + super().__init__() + self.corp_id = conf().get("wechatcom_corp_id") + self.secret = conf().get("wechatcom_kf_secret") + self.token = conf().get("wechatcom_kf_token") + self.aes_key = conf().get("wechatcom_kf_aes_key") + self._http_server = None + logger.info( + "[wechatcom_kf] Initializing WeCom customer-service channel, corp_id: {}".format( + self.corp_id + ) + ) + self.crypto = WeChatCrypto(self.token, self.aes_key, self.corp_id) + self.client = WechatComAppClient(self.corp_id, self.secret) + + cursor_dir = conf().get("wechatcom_kf_cursor_dir", "tmp") + cursor_path = os.path.join(cursor_dir, "wechatcom_kf_cursors.json") + self.cursor_store = CursorStore(cursor_path) + self.skip_history_on_first_start = conf().get( + "wechatcom_kf_skip_history_on_first_start", True + ) + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + def startup(self): + urls = ("/wxkf/?", "channel.wechatcom_kf.wechatcom_kf_channel.Query") + app = web.application(urls, globals(), autoreload=False) + port = conf().get("wechatcom_kf_port", 9899) + logger.info("[wechatcom_kf] WeCom customer-service channel started") + logger.info("[wechatcom_kf] Listening on http://0.0.0.0:{}/wxkf/".format(port)) + func = web.httpserver.StaticMiddleware(app.wsgifunc()) + func = web.httpserver.LogMiddleware(func) + server = web.httpserver.WSGIServer(("0.0.0.0", port), func) + self._http_server = server + try: + server.start() + except (KeyboardInterrupt, SystemExit): + server.stop() + + def stop(self): + if self._http_server: + try: + self._http_server.stop() + logger.info("[wechatcom_kf] HTTP server stopped") + except Exception as e: + logger.warning(f"[wechatcom_kf] Error stopping HTTP server: {e}") + self._http_server = None + + # ------------------------------------------------------------------ + # Outbound — implementing the abstract `send` contract + # ------------------------------------------------------------------ + def send(self, reply: Reply, context: Context): + receiver = context["receiver"] + msg = context.kwargs.get("msg") + external_userid = context.get("external_userid") or (msg.external_userid if msg else None) + open_kfid = context.get("open_kfid") or (msg.open_kfid if msg else None) + + if not external_userid or not open_kfid: + logger.error( + "[wechatcom_kf] missing external_userid or open_kfid, cannot send: " + f"external_userid={external_userid}, open_kfid={open_kfid}" + ) + return + + if reply.type in [ReplyType.TEXT, ReplyType.ERROR, ReplyType.INFO]: + reply_text = remove_markdown_symbol(reply.content) + texts = split_string_by_utf8_length(reply_text, MAX_UTF8_LEN) + if len(texts) > 1: + logger.info( + "[wechatcom_kf] text too long, split into {} parts".format(len(texts)) + ) + for i, text in enumerate(texts): + self._send_text(external_userid, open_kfid, text) + if i != len(texts) - 1: + time.sleep(0.5) + logger.info("[wechatcom_kf] Do send text to {}: {}".format(receiver, reply_text)) + + elif reply.type == ReplyType.VOICE: + file_path = reply.content + try: + amr_file = os.path.splitext(file_path)[0] + ".amr" + any_to_amr(file_path, amr_file) + duration, files = split_audio(amr_file, 60 * 1000) + if len(files) > 1: + logger.info( + "[wechatcom_kf] voice too long {}s > 60s, split into {} parts".format( + duration / 1000.0, len(files) + ) + ) + media_ids = [] + for path in files: + with open(path, "rb") as f: + response = self.client.media.upload("voice", f) + logger.debug("[wechatcom_kf] upload voice response: {}".format(response)) + media_ids.append(response["media_id"]) + except ImportError as e: + logger.error("[wechatcom_kf] voice conversion failed: {}".format(e)) + logger.error("[wechatcom_kf] please install pydub: pip install pydub") + return + except WeChatClientException as e: + logger.error("[wechatcom_kf] upload voice failed: {}".format(e)) + return + + try: + os.remove(file_path) + if amr_file != file_path: + os.remove(amr_file) + except Exception: + pass + + for media_id in media_ids: + self._send_voice(external_userid, open_kfid, media_id) + time.sleep(1) + logger.info("[wechatcom_kf] sendVoice={}, receiver={}".format(reply.content, receiver)) + + elif reply.type == ReplyType.IMAGE_URL: + img_url = reply.content + pic_res = requests.get(img_url, stream=True) + image_storage = io.BytesIO() + for block in pic_res.iter_content(1024): + image_storage.write(block) + sz = fsize(image_storage) + if sz >= 10 * 1024 * 1024: + logger.info("[wechatcom_kf] image too large, compressing, sz={}".format(sz)) + image_storage = compress_imgfile(image_storage, 10 * 1024 * 1024 - 1) + image_storage.seek(0) + try: + response = self.client.media.upload("image", image_storage) + except WeChatClientException as e: + logger.error("[wechatcom_kf] upload image failed: {}".format(e)) + return + self._send_image(external_userid, open_kfid, response["media_id"]) + logger.info("[wechatcom_kf] sendImage url={}, receiver={}".format(img_url, receiver)) + + elif reply.type == ReplyType.IMAGE: + image_storage = reply.content + sz = fsize(image_storage) + if sz >= 10 * 1024 * 1024: + logger.info("[wechatcom_kf] image too large, compressing, sz={}".format(sz)) + image_storage = compress_imgfile(image_storage, 10 * 1024 * 1024 - 1) + image_storage.seek(0) + try: + response = self.client.media.upload("image", image_storage) + except WeChatClientException as e: + logger.error("[wechatcom_kf] upload image failed: {}".format(e)) + return + self._send_image(external_userid, open_kfid, response["media_id"]) + logger.info("[wechatcom_kf] sendImage, receiver={}".format(receiver)) + + elif reply.type == ReplyType.VIDEO_URL: + video_url = reply.content + try: + response = self.client.media.upload( + "video", requests.get(video_url, stream=True).content + ) + except WeChatClientException as e: + logger.error("[wechatcom_kf] upload video failed: {}".format(e)) + return + self._send_video(external_userid, open_kfid, response["media_id"]) + logger.info("[wechatcom_kf] sendVideo url={}, receiver={}".format(video_url, receiver)) + + elif reply.type == ReplyType.FILE: + file_path = reply.content + try: + with open(file_path, "rb") as f: + response = self.client.media.upload( + "file", (os.path.basename(file_path), f.read()) + ) + except WeChatClientException as e: + logger.error("[wechatcom_kf] upload file failed: {}".format(e)) + return + self._send_file(external_userid, open_kfid, response["media_id"]) + logger.info("[wechatcom_kf] sendFile={}, receiver={}".format(file_path, receiver)) + + else: + logger.warning("[wechatcom_kf] unsupported reply type: {}".format(reply.type)) + + # ------------------------------------------------------------------ + # Inbound — pull messages by cursor + # ------------------------------------------------------------------ + def consume_callback(self, token: str, open_kfid: str): + """ + Called from the HTTP `Query.POST` handler whenever WeCom notifies + us that there are new messages for `open_kfid`. Pulls all new + messages via sync_msg and feeds them into `produce()`. + """ + existing_cursor = self.cursor_store.get(open_kfid) + + # First-time bootstrap: avoid replaying up to 14 days of history. + if not existing_cursor and self.skip_history_on_first_start: + self._initialize_cursor(token, open_kfid) + return + + msgs = self._pull_messages(token, open_kfid, existing_cursor) + if not msgs: + return + for raw in msgs: + try: + kf_msg = WechatComKfMessage(msg=raw, client=self.client) + except NotImplementedError as e: + logger.debug("[wechatcom_kf] {}".format(e)) + continue + context = self._compose_context( + kf_msg.ctype, + kf_msg.content, + isgroup=False, + msg=kf_msg, + ) + if context: + self.produce(context) + time.sleep(0.05) # tiny gap between messages of the same batch + + def _initialize_cursor(self, token: str, open_kfid: str): + """ + Drain all current messages for this `open_kfid` without producing + any context, just to advance the cursor to "now". This prevents + a fresh deployment from replying to up to ~14 days of history. + """ + next_cursor = "" + total_skipped = 0 + while True: + data = self._call_sync_msg(token, open_kfid, next_cursor) + if data is None: + break + msg_list = data.get("msg_list") or [] + total_skipped += len(msg_list) + cursor_after = data.get("next_cursor") or "" + if cursor_after: + self.cursor_store.set(open_kfid, cursor_after) + if not data.get("has_more"): + break + if not cursor_after or cursor_after == next_cursor: + break + next_cursor = cursor_after + logger.info( + "[wechatcom_kf] first-start bootstrap finished for open_kfid={}, " + "skipped {} historical messages".format(open_kfid, total_skipped) + ) + + def _pull_messages(self, token: str, open_kfid: str, next_cursor: Optional[str]) -> list: + """Loop sync_msg until `has_more` is false. Returns raw msg dicts.""" + collected = [] + cursor = next_cursor or "" + while True: + data = self._call_sync_msg(token, open_kfid, cursor) + if data is None: + break + for item in data.get("msg_list") or []: + # Only consume messages from external users; ignore replies + # generated by our own kf account, otherwise we would loop + # back into ourselves. + if not item.get("external_userid"): + continue + if item.get("msgtype") in ("text", "image", "voice"): + collected.append(item) + cursor_after = data.get("next_cursor") or "" + if cursor_after: + self.cursor_store.set(open_kfid, cursor_after) + if not data.get("has_more"): + break + if not cursor_after or cursor_after == cursor: + break + cursor = cursor_after + + if collected: + collected = _dedup_image_text_pair(collected) + logger.info( + "[wechatcom_kf] pulled {} messages for open_kfid={}".format(len(collected), open_kfid) + ) + return collected + + def _call_sync_msg(self, token: str, open_kfid: str, cursor: str) -> Optional[dict]: + url = f"{KF_API_BASE}/sync_msg?access_token={self.client.fetch_access_token()}" + payload = { + "token": token, + "open_kfid": open_kfid, + "limit": SYNC_MSG_LIMIT, + } + if cursor: + payload["cursor"] = cursor + try: + resp = requests.post(url, json=payload, timeout=10).json() + except Exception as e: + logger.error(f"[wechatcom_kf] sync_msg request failed: {e}") + return None + + if resp.get("errcode") != 0: + logger.error( + f"[wechatcom_kf] sync_msg errcode={resp.get('errcode')}, " + f"errmsg={resp.get('errmsg')}, open_kfid={open_kfid}" + ) + return None + return resp + + # ------------------------------------------------------------------ + # Outbound HTTP wrappers (kf/send_msg) + # ------------------------------------------------------------------ + def _post_send_msg(self, payload: dict) -> dict: + url = f"{KF_API_BASE}/send_msg?access_token={self.client.fetch_access_token()}" + try: + resp = requests.post(url, json=payload, timeout=10).json() + except Exception as e: + logger.error(f"[wechatcom_kf] send_msg request failed: {e}") + return {"errcode": -1, "errmsg": str(e)} + if resp.get("errcode") != 0: + logger.error(f"[wechatcom_kf] send_msg failed, payload={payload}, resp={resp}") + return resp + + def _send_text(self, external_userid: str, open_kfid: str, content: str) -> dict: + return self._post_send_msg({ + "touser": external_userid, + "open_kfid": open_kfid, + "msgtype": "text", + "text": {"content": content}, + }) + + def _send_image(self, external_userid: str, open_kfid: str, media_id: str) -> dict: + return self._post_send_msg({ + "touser": external_userid, + "open_kfid": open_kfid, + "msgtype": "image", + "image": {"media_id": media_id}, + }) + + def _send_voice(self, external_userid: str, open_kfid: str, media_id: str) -> dict: + return self._post_send_msg({ + "touser": external_userid, + "open_kfid": open_kfid, + "msgtype": "voice", + "voice": {"media_id": media_id}, + }) + + def _send_video(self, external_userid: str, open_kfid: str, media_id: str) -> dict: + return self._post_send_msg({ + "touser": external_userid, + "open_kfid": open_kfid, + "msgtype": "video", + "video": {"media_id": media_id}, + }) + + def _send_file(self, external_userid: str, open_kfid: str, media_id: str) -> dict: + return self._post_send_msg({ + "touser": external_userid, + "open_kfid": open_kfid, + "msgtype": "file", + "file": {"media_id": media_id}, + }) + + def _send_link(self, external_userid: str, open_kfid: str, link_data: dict) -> dict: + return self._post_send_msg({ + "touser": external_userid, + "open_kfid": open_kfid, + "msgtype": "link", + "link": link_data, + }) + + +def _dedup_image_text_pair(messages: list) -> list: + """ + A WeChat user often sends an image immediately followed by a text + question (e.g. "what's in this picture?"). When that happens we + treat them as one combined turn: image first, then text. + Otherwise (single message or unrelated batch) we return only the + last message to avoid replying to stale history. + """ + if not messages: + return [] + + text_messages = [m for m in messages if m["msgtype"] == "text"] + image_messages = [m for m in messages if m["msgtype"] == "image"] + if text_messages and image_messages: + last_text_time = text_messages[-1]["send_time"] + first_image_time = image_messages[0]["send_time"] + if abs(last_text_time - first_image_time) <= 5: + return [image_messages[0], text_messages[-1]] + return [messages[-1]] + + +# ---------------------------------------------------------------------- +# HTTP handlers (web.py) +# ---------------------------------------------------------------------- +class Query: + def GET(self): + channel = WechatComKfChannel() + params = web.input() + logger.info("[wechatcom_kf] verify params: {}".format(params)) + try: + signature = params.msg_signature + timestamp = params.timestamp + nonce = params.nonce + echostr = params.echostr + echostr = channel.crypto.check_signature(signature, timestamp, nonce, echostr) + except (InvalidSignatureException, InvalidCorpIdException): + raise web.Forbidden() + return echostr + + def POST(self): + channel = WechatComKfChannel() + params = web.input() + try: + signature = params.msg_signature + timestamp = params.timestamp + nonce = params.nonce + raw_body = web.data() + decrypted = channel.crypto.decrypt_message(raw_body, signature, timestamp, nonce) + except (InvalidSignatureException, InvalidCorpIdException) as e: + logger.warning(f"[wechatcom_kf] invalid signature: {e}") + raise web.Forbidden() + + # We need the Token + OpenKfId fields from the inner XML to call + # sync_msg. wechatpy's parsed object exposes neither, so we parse + # the raw XML directly. + try: + root = ET.fromstring(decrypted) + except ET.ParseError as e: + logger.error(f"[wechatcom_kf] xml parse error: {e}") + return "success" + + msg_type = (root.findtext("MsgType") or "").strip() + event = (root.findtext("Event") or "").strip() + if msg_type != "event" or event != "kf_msg_or_event": + logger.debug( + f"[wechatcom_kf] ignored callback msg_type={msg_type}, event={event}" + ) + return "success" + + token = root.findtext("Token") or "" + open_kfid = root.findtext("OpenKfId") or "" + if not token or not open_kfid: + logger.warning( + f"[wechatcom_kf] callback missing token or open_kfid: {decrypted}" + ) + return "success" + + try: + channel.consume_callback(token, open_kfid) + except Exception as e: + logger.exception(f"[wechatcom_kf] consume_callback error: {e}") + return "success" diff --git a/channel/wechatcom_kf/wechatcom_kf_cursor_store.py b/channel/wechatcom_kf/wechatcom_kf_cursor_store.py new file mode 100644 index 00000000..445e2137 --- /dev/null +++ b/channel/wechatcom_kf/wechatcom_kf_cursor_store.py @@ -0,0 +1,74 @@ +# -*- coding=utf-8 -*- +""" +Local-file based persistence for WeCom customer-service `next_cursor`. + +Why we need this: + The WeCom customer-service (微信客服) callback only notifies us that + "new messages exist". To actually fetch them we must call the + `cgi-bin/kf/sync_msg` endpoint with a `cursor` so that we only get + messages newer than the previously processed one. If we lose this + cursor (e.g. on process restart) WeCom will replay up to ~14 days of + history, which would cause the bot to flood users with duplicate + replies. + +This implementation deliberately avoids any external dependency +(no Redis / no DB) — a single JSON file under the project's tmp dir is +enough for a CoW-style single-process deployment. +""" +import json +import os +import threading +from typing import Optional + +from common.log import logger + + +class CursorStore: + """Thread-safe per-`open_kfid` cursor store backed by a JSON file.""" + + def __init__(self, file_path: str): + self._file_path = file_path + self._lock = threading.Lock() + self._data = self._load() + + def _load(self) -> dict: + try: + if os.path.exists(self._file_path): + with open(self._file_path, "r", encoding="utf-8") as f: + return json.load(f) or {} + except Exception as e: + logger.warning(f"[wechatcom_kf] failed to load cursor file {self._file_path}: {e}") + return {} + + def _flush_locked(self): + # Atomic write: write to *.tmp first then rename, avoid corruption on crash. + tmp_path = self._file_path + ".tmp" + try: + os.makedirs(os.path.dirname(self._file_path) or ".", exist_ok=True) + with open(tmp_path, "w", encoding="utf-8") as f: + json.dump(self._data, f, ensure_ascii=False) + os.replace(tmp_path, self._file_path) + except Exception as e: + logger.warning(f"[wechatcom_kf] failed to flush cursor file {self._file_path}: {e}") + try: + if os.path.exists(tmp_path): + os.remove(tmp_path) + except Exception: + pass + + def get(self, open_kfid: str) -> Optional[str]: + with self._lock: + return self._data.get(open_kfid) + + def set(self, open_kfid: str, cursor: str): + if not cursor: + return + with self._lock: + if self._data.get(open_kfid) == cursor: + return + self._data[open_kfid] = cursor + self._flush_locked() + + def has(self, open_kfid: str) -> bool: + with self._lock: + return open_kfid in self._data diff --git a/channel/wechatcom_kf/wechatcom_kf_message.py b/channel/wechatcom_kf/wechatcom_kf_message.py new file mode 100644 index 00000000..85bfdf09 --- /dev/null +++ b/channel/wechatcom_kf/wechatcom_kf_message.py @@ -0,0 +1,82 @@ +# -*- coding=utf-8 -*- +""" +Adapter that turns a single `sync_msg` item from WeCom customer-service +into a CoW `ChatMessage` object. +""" +from wechatpy.enterprise import WeChatClient + +from bridge.context import ContextType +from channel.chat_message import ChatMessage +from common.log import logger +from common.tmp_dir import TmpDir + + +class WechatComKfMessage(ChatMessage): + """ + msg structure (from cgi-bin/kf/sync_msg): + { + "msgid": "...", + "send_time": 1700000000, + "origin": 3, + "msgtype": "text" | "image" | "voice" | ..., + "open_kfid": "wkxxxx", + "external_userid": "wmxxxx", + "text": {"content": "..."}, + "image": {"media_id": "..."}, + "voice": {"media_id": "..."}, + ... + } + """ + + def __init__(self, msg: dict, client: WeChatClient = None, is_group: bool = False): + # NOTE: skip parent constructor because it expects a wechatpy parsed + # message object, while here we receive a raw dict from sync_msg. + super().__init__(msg) + self.is_group = is_group + self.msg_id = msg.get("msgid") + self.create_time = msg.get("send_time") + self.origin = msg.get("origin") + self.msgtype = msg.get("msgtype") + self.open_kfid = msg.get("open_kfid") + self.external_userid = msg.get("external_userid") + + if self.msgtype == "text": + self.ctype = ContextType.TEXT + self.content = msg.get("text", {}).get("content", "") + elif self.msgtype == "image": + self.ctype = ContextType.IMAGE + media_id = msg.get("image", {}).get("media_id", "") + self.content = TmpDir().path() + media_id + ".jpg" + + def download_image(): + response = client.media.download(media_id) + if response.status_code == 200: + with open(self.content, "wb") as f: + f.write(response.content) + else: + logger.info(f"[wechatcom_kf] Failed to download image, {response.content}") + + self._prepare_fn = download_image + elif self.msgtype == "voice": + self.ctype = ContextType.VOICE + media_id = msg.get("voice", {}).get("media_id", "") + # WeCom returns amr by default; downstream voice pipeline will convert. + self.content = TmpDir().path() + media_id + ".amr" + + def download_voice(): + response = client.media.download(media_id) + if response.status_code == 200: + with open(self.content, "wb") as f: + f.write(response.content) + else: + logger.info(f"[wechatcom_kf] Failed to download voice, {response.content}") + + self._prepare_fn = download_voice + else: + raise NotImplementedError( + f"[wechatcom_kf] Unsupported message type: {self.msgtype}" + ) + + self.from_user_id = self.external_userid + self.to_user_id = self.open_kfid + self.other_user_id = self.external_userid diff --git a/common/const.py b/common/const.py index abe3c2c1..8227ff7d 100644 --- a/common/const.py +++ b/common/const.py @@ -227,3 +227,4 @@ DINGTALK = "dingtalk" WECOM_BOT = "wecom_bot" QQ = "qq" WEIXIN = "weixin" +WECHATCOM_KF = "wechatcom_kf" # WeCom customer service (微信客服) channel diff --git a/config.py b/config.py index d172fd3d..fe36a4c8 100644 --- a/config.py +++ b/config.py @@ -151,6 +151,14 @@ available_setting = { "wechatcomapp_secret": "", # 企业微信app的secret "wechatcomapp_agent_id": "", # 企业微信app的agent_id "wechatcomapp_aes_key": "", # 企业微信app的aes_key + # 微信客服(wechatcom_kf)的配置 + # 注意: 微信客服与企微自建应用是两套不同的应用,共用 corp_id,但 secret/token/aes_key 各自独立 + "wechatcom_kf_token": "", # 微信客服回调token + "wechatcom_kf_port": 9899, # 微信客服回调服务端口 + "wechatcom_kf_secret": "", # 微信客服应用的secret + "wechatcom_kf_aes_key": "", # 微信客服回调aes_key + "wechatcom_kf_cursor_dir": "tmp", # 拉取消息的cursor持久化目录(相对项目根) + "wechatcom_kf_skip_history_on_first_start": True, # 首次启动(无cursor)时跳过历史消息,只拉最新 # 飞书配置 "feishu_port": 80, # 飞书bot监听端口,仅webhook模式需要 "feishu_app_id": "", # 飞书机器人应用APP Id @@ -174,7 +182,7 @@ available_setting = { # chatgpt指令自定义触发词 "clear_memory_commands": ["#清除记忆"], # 重置会话指令,必须以#开头 # channel配置 - "channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wecom_bot,weixin,wechatmp,wechatmp_service,wechatcom_app + "channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wecom_bot,weixin,wechatmp,wechatmp_service,wechatcom_app,wechatcom_kf "web_console": True, # 是否自动启动Web控制台(默认启动)。设为False可禁用 "subscribe_msg": "", # 订阅消息, 支持: wechatmp, wechatmp_service, wechatcom_app "debug": False, # 是否开启debug模式,开启后会打印更多日志 @@ -410,6 +418,10 @@ def load_config(): "wechatmp_app_secret": "WECHATMP_APP_SECRET", "wechatcomapp_agent_id": "WECHATCOMAPP_AGENT_ID", "wechatcomapp_secret": "WECHATCOMAPP_SECRET", + "wechatcom_corp_id": "WECHATCOM_CORP_ID", + "wechatcom_kf_secret": "WECHATCOM_KF_SECRET", + "wechatcom_kf_token": "WECHATCOM_KF_TOKEN", + "wechatcom_kf_aes_key": "WECHATCOM_KF_AES_KEY", "qq_app_id": "QQ_APP_ID", "qq_app_secret": "QQ_APP_SECRET", "weixin_token": "WEIXIN_TOKEN",