mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-06-02 00:57:41 +08:00
fix: dispatch callback async to avoid WeCom 5s timeout
WeCom requires the callback HTTP response within ~5s, otherwise it retries the same notification. The previous code ran sync_msg pulling synchronously inside Query.POST, so a backlog could exceed the deadline and trigger retries that race on the same cursor and end up replying to the same user multiple times. - Dispatch consume_callback to a background ThreadPoolExecutor and return 'success' immediately from the HTTP handler. - Serialize work per open_kfid with a lock so retried/concurrent callbacks queue up instead of racing the cursor window. - Shutdown the executor on channel stop().
This commit is contained in:
@@ -16,8 +16,11 @@ Differences from `channel/wechatcom/` (企微自建应用):
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import xml.etree.ElementTree as ET
|
||||
from collections import defaultdict
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Optional
|
||||
|
||||
import requests
|
||||
@@ -84,6 +87,19 @@ class WechatKfChannel(ChatChannel):
|
||||
cursor_path = os.path.join("tmp", "wechat_kf_cursors.json")
|
||||
self.cursor_store = CursorStore(cursor_path)
|
||||
|
||||
# WeCom requires the callback HTTP response to return within ~5s,
|
||||
# otherwise it retries the same notification. sync_msg pulling
|
||||
# can easily exceed that, so we dispatch it to a background pool
|
||||
# and let `Query.POST` reply success immediately.
|
||||
self._callback_executor = ThreadPoolExecutor(
|
||||
max_workers=4, thread_name_prefix="wxkf-cb"
|
||||
)
|
||||
# Per-open_kfid lock: serialize sync_msg for the same kf account
|
||||
# so that callback retries (or rapid-fire events) don't race on
|
||||
# the same cursor and produce duplicate replies.
|
||||
self._kf_locks: dict = defaultdict(threading.Lock)
|
||||
self._kf_locks_guard = threading.Lock()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Lifecycle
|
||||
# ------------------------------------------------------------------
|
||||
@@ -110,6 +126,10 @@ class WechatKfChannel(ChatChannel):
|
||||
except Exception as e:
|
||||
logger.warning(f"[wechat_kf] Error stopping HTTP server: {e}")
|
||||
self._http_server = None
|
||||
try:
|
||||
self._callback_executor.shutdown(wait=False)
|
||||
except Exception as e:
|
||||
logger.warning(f"[wechat_kf] Error shutting down callback executor: {e}")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Outbound — implementing the abstract `send` contract
|
||||
@@ -243,6 +263,37 @@ class WechatKfChannel(ChatChannel):
|
||||
# ------------------------------------------------------------------
|
||||
# Inbound — pull messages by cursor
|
||||
# ------------------------------------------------------------------
|
||||
def _get_kf_lock(self, open_kfid: str) -> threading.Lock:
|
||||
with self._kf_locks_guard:
|
||||
return self._kf_locks[open_kfid]
|
||||
|
||||
def submit_callback(self, token: str, open_kfid: str):
|
||||
"""
|
||||
Async entry point used by the HTTP handler. Submits the actual
|
||||
sync_msg pulling to a background thread so the callback response
|
||||
can return within WeCom's 5s deadline.
|
||||
"""
|
||||
try:
|
||||
self._callback_executor.submit(self._run_callback, token, open_kfid)
|
||||
except RuntimeError as e:
|
||||
# Executor may be shut down during process exit; fall back
|
||||
# to inline execution so we don't silently drop the event.
|
||||
logger.warning(f"[wechat_kf] executor unavailable, run inline: {e}")
|
||||
self._run_callback(token, open_kfid)
|
||||
|
||||
def _run_callback(self, token: str, open_kfid: str):
|
||||
# Block on the per-kfid lock so retried callbacks queue up
|
||||
# behind the in-flight one. The queued worker will then call
|
||||
# sync_msg with the (already advanced) cursor, which is cheap
|
||||
# when there is nothing new and still picks up any messages
|
||||
# that arrived after the previous worker's last pull.
|
||||
lock = self._get_kf_lock(open_kfid)
|
||||
with lock:
|
||||
try:
|
||||
self.consume_callback(token, open_kfid)
|
||||
except Exception as e:
|
||||
logger.exception(f"[wechat_kf] consume_callback error: {e}")
|
||||
|
||||
def consume_callback(self, token: str, open_kfid: str):
|
||||
"""
|
||||
Called from the HTTP `Query.POST` handler whenever WeCom notifies
|
||||
@@ -502,8 +553,8 @@ class Query:
|
||||
)
|
||||
return "success"
|
||||
|
||||
try:
|
||||
channel.consume_callback(token, open_kfid)
|
||||
except Exception as e:
|
||||
logger.exception(f"[wechat_kf] consume_callback error: {e}")
|
||||
# Hand off to a background worker — WeCom requires the callback
|
||||
# to return success within ~5 seconds, otherwise it will retry
|
||||
# and we may race the same cursor window into duplicate replies.
|
||||
channel.submit_callback(token, open_kfid)
|
||||
return "success"
|
||||
|
||||
Reference in New Issue
Block a user