feat: improve channel connection stability

This commit is contained in:
zhayujie
2026-03-05 15:55:16 +08:00
parent 84c6f31c76
commit f1c04bc60d
6 changed files with 367 additions and 54 deletions

View File

@@ -13,12 +13,38 @@ class Channel(object):
channel_type = ""
NOT_SUPPORT_REPLYTYPE = [ReplyType.VOICE, ReplyType.IMAGE]
def __init__(self):
import threading
self._startup_event = threading.Event()
self._startup_error = None
self.cloud_mode = False # set to True by ChannelManager when running with cloud client
def startup(self):
"""
init channel
"""
raise NotImplementedError
def report_startup_success(self):
self._startup_error = None
self._startup_event.set()
def report_startup_error(self, error: str):
self._startup_error = error
self._startup_event.set()
def wait_startup(self, timeout: float = 3) -> (bool, str):
"""
Wait for channel startup result.
Returns (success: bool, error_msg: str).
"""
ready = self._startup_event.wait(timeout=timeout)
if not ready:
return True, ""
if self._startup_error:
return False, self._startup_error
return True, ""
def stop(self):
"""
stop channel gracefully, called before restart

View File

@@ -26,6 +26,7 @@ class ChatChannel(Channel):
user_id = None # 登录的用户id
def __init__(self):
super().__init__()
# Instance-level attributes so each channel subclass has its own
# independent session queue and lock. Previously these were class-level,
# which caused contexts from one channel (e.g. Feishu) to be consumed

View File

@@ -115,6 +115,35 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler):
# Robot code cache (extracted from incoming messages)
self._robot_code = None
def _open_connection(self, client):
"""
Open a DingTalk stream connection directly, bypassing SDK's internal error-swallowing.
Returns (connection_dict, error_str). On success error_str is empty; on failure
connection_dict is None and error_str contains a human-readable message.
"""
try:
resp = requests.post(
"https://api.dingtalk.com/v1.0/gateway/connections/open",
headers={"Content-Type": "application/json", "Accept": "application/json"},
json={
"clientId": client.credential.client_id,
"clientSecret": client.credential.client_secret,
"subscriptions": [{"type": "CALLBACK",
"topic": dingtalk_stream.chatbot.ChatbotMessage.TOPIC}],
"ua": "dingtalk-sdk-python/cow",
"localIp": "",
},
timeout=10,
)
body = resp.json()
if not resp.ok:
code = body.get("code", resp.status_code)
message = body.get("message", resp.reason)
return None, f"open connection failed: [{code}] {message}"
return body, ""
except Exception as e:
return None, f"open connection failed: {e}"
def startup(self):
import asyncio
self.dingtalk_client_id = conf().get('dingtalk_client_id')
@@ -125,34 +154,80 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler):
self._stream_client = client
client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self)
logger.info("[DingTalk] ✅ Stream client initialized, ready to receive messages")
# Run the connection loop ourselves instead of delegating to client.start(),
# so we can get detailed error messages and respond to stop() quickly.
import urllib.parse as _urlparse
import websockets as _ws
import json as _json
client.pre_start()
_first_connect = True
while self._running:
# Open connection using our own request so we get detailed error info.
connection, err_msg = self._open_connection(client)
if connection is None:
if _first_connect:
logger.warning(f"[DingTalk] {err_msg}")
self.report_startup_error(err_msg)
_first_connect = False
else:
logger.warning(f"[DingTalk] {err_msg}, retrying in 10s...")
# Interruptible sleep: checks _running every 100ms.
for _ in range(100):
if not self._running:
break
time.sleep(0.1)
continue
if _first_connect:
logger.info("[DingTalk] ✅ Connected to DingTalk stream")
self.report_startup_success()
_first_connect = False
else:
logger.info("[DingTalk] Reconnected to DingTalk stream")
# Run the WebSocket session in an asyncio loop.
uri = '%s?ticket=%s' % (
connection['endpoint'],
_urlparse.quote_plus(connection['ticket'])
)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._event_loop = loop
try:
if not _first_connect:
logger.info("[DingTalk] Reconnecting...")
_first_connect = False
loop.run_until_complete(client.start())
async def _session():
async with _ws.connect(uri) as websocket:
client.websocket = websocket
async for raw_message in websocket:
json_message = _json.loads(raw_message)
result = await client.route_message(json_message)
if result == dingtalk_stream.DingTalkStreamClient.TAG_DISCONNECT:
break
loop.run_until_complete(_session())
except (KeyboardInterrupt, SystemExit):
logger.info("[DingTalk] Startup loop received stop signal, exiting")
logger.info("[DingTalk] Session loop received stop signal, exiting")
break
except Exception as e:
if not self._running:
break
logger.warning(f"[DingTalk] Stream connection error: {e}, reconnecting in 3s...")
time.sleep(3)
logger.warning(f"[DingTalk] Stream session error: {e}, reconnecting in 3s...")
for _ in range(30):
if not self._running:
break
time.sleep(0.1)
finally:
self._event_loop = None
try:
loop.close()
except Exception:
pass
logger.info("[DingTalk] Startup loop exited")
def stop(self):
import asyncio
logger.info("[DingTalk] stop() called, setting _running=False")
self._running = False
loop = self._event_loop

View File

@@ -169,10 +169,20 @@ class FeiShuChanel(ChatChannel):
context.verify_mode = ssl.CERT_NONE
return context
# Give this thread its own event loop so lark SDK can call run_until_complete
# lark_oapi.ws.client captures the event loop at module-import time as a module-
# level global variable. When a previous ws thread is force-killed via ctypes its
# loop may still be marked as "running", which causes the next ws_client.start()
# call (in this new thread) to raise "This event loop is already running".
# Fix: replace the module-level loop with a brand-new, idle loop before starting.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
import lark_oapi.ws.client as _lark_ws_client_mod
_lark_ws_client_mod.loop = loop
except Exception:
pass
startup_error = None
for attempt in range(2):
try:
if attempt == 1:
@@ -202,8 +212,11 @@ class FeiShuChanel(ChatChannel):
logger.warning(f"[FeiShu] SSL error: {error_msg}, retrying...")
continue
logger.error(f"[FeiShu] Websocket client error: {e}", exc_info=True)
startup_error = error_msg
ssl_module.create_default_context = original_create_default_context
break
if startup_error:
self.report_startup_error(startup_error)
try:
loop.close()
except Exception: