diff --git a/channel/wechat_kf/wechat_kf_channel.py b/channel/wechat_kf/wechat_kf_channel.py index 7b8000f3..c2ffc67f 100644 --- a/channel/wechat_kf/wechat_kf_channel.py +++ b/channel/wechat_kf/wechat_kf_channel.py @@ -16,8 +16,11 @@ Differences from `channel/wechatcom/` (企微自建应用): import io import json import os +import threading import time import xml.etree.ElementTree as ET +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor from typing import Optional import requests @@ -84,6 +87,19 @@ class WechatKfChannel(ChatChannel): cursor_path = os.path.join("tmp", "wechat_kf_cursors.json") self.cursor_store = CursorStore(cursor_path) + # WeCom requires the callback HTTP response to return within ~5s, + # otherwise it retries the same notification. sync_msg pulling + # can easily exceed that, so we dispatch it to a background pool + # and let `Query.POST` reply success immediately. + self._callback_executor = ThreadPoolExecutor( + max_workers=4, thread_name_prefix="wxkf-cb" + ) + # Per-open_kfid lock: serialize sync_msg for the same kf account + # so that callback retries (or rapid-fire events) don't race on + # the same cursor and produce duplicate replies. + self._kf_locks: dict = defaultdict(threading.Lock) + self._kf_locks_guard = threading.Lock() + # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ @@ -110,6 +126,10 @@ class WechatKfChannel(ChatChannel): except Exception as e: logger.warning(f"[wechat_kf] Error stopping HTTP server: {e}") self._http_server = None + try: + self._callback_executor.shutdown(wait=False) + except Exception as e: + logger.warning(f"[wechat_kf] Error shutting down callback executor: {e}") # ------------------------------------------------------------------ # Outbound — implementing the abstract `send` contract @@ -243,6 +263,37 @@ class WechatKfChannel(ChatChannel): # ------------------------------------------------------------------ # Inbound — pull messages by cursor # ------------------------------------------------------------------ + def _get_kf_lock(self, open_kfid: str) -> threading.Lock: + with self._kf_locks_guard: + return self._kf_locks[open_kfid] + + def submit_callback(self, token: str, open_kfid: str): + """ + Async entry point used by the HTTP handler. Submits the actual + sync_msg pulling to a background thread so the callback response + can return within WeCom's 5s deadline. + """ + try: + self._callback_executor.submit(self._run_callback, token, open_kfid) + except RuntimeError as e: + # Executor may be shut down during process exit; fall back + # to inline execution so we don't silently drop the event. + logger.warning(f"[wechat_kf] executor unavailable, run inline: {e}") + self._run_callback(token, open_kfid) + + def _run_callback(self, token: str, open_kfid: str): + # Block on the per-kfid lock so retried callbacks queue up + # behind the in-flight one. The queued worker will then call + # sync_msg with the (already advanced) cursor, which is cheap + # when there is nothing new and still picks up any messages + # that arrived after the previous worker's last pull. + lock = self._get_kf_lock(open_kfid) + with lock: + try: + self.consume_callback(token, open_kfid) + except Exception as e: + logger.exception(f"[wechat_kf] consume_callback error: {e}") + def consume_callback(self, token: str, open_kfid: str): """ Called from the HTTP `Query.POST` handler whenever WeCom notifies @@ -502,8 +553,8 @@ class Query: ) return "success" - try: - channel.consume_callback(token, open_kfid) - except Exception as e: - logger.exception(f"[wechat_kf] consume_callback error: {e}") + # Hand off to a background worker — WeCom requires the callback + # to return success within ~5 seconds, otherwise it will retry + # and we may race the same cursor window into duplicate replies. + channel.submit_callback(token, open_kfid) return "success"