fix(feishu): unblock streaming via async push worker

This commit is contained in:
zhayujie
2026-05-05 19:36:15 +08:00
parent a9c8a59f58
commit 1065c7e722

View File

@@ -714,8 +714,6 @@ class FeiShuChanel(ChatChannel):
- 创建卡片实体失败(缺权限、网络等)→ 不设置 feishu_streamed 标记,让 chat_channel
走普通文本回复路径,用户收到完整回复但无打字机效果,并打 warning 日志
"""
import time as _time
# 共享状态(受 lock 保护)
# 多轮 agent 模式下,每个"中间过场消息"会作为一张独立卡片发送。
# current_text 只承载当前正在流式渲染的那张卡片的内容message_end / agent_end
@@ -723,7 +721,6 @@ class FeiShuChanel(ChatChannel):
current_text = [""] # 当前卡片正在累加的 LLM 输出
card_id = [None] # 当前流式卡片的实体 ID每段独立
message_id = [None] # 当前卡片发送后的消息 ID仅日志用
last_update_time = [0.0]
# 占位发送是同步进行的,但用一个 in-flight 标记防止并发的多条 message_update
# 事件各自触发一次创建+发送,导致发出多张卡片。
init_in_flight = [False]
@@ -731,19 +728,62 @@ class FeiShuChanel(ChatChannel):
disabled = [False]
lock = threading.Lock()
# ---- 异步推送队列 ----------------------------------------------------
# 同步 requests.put 单次 100~300ms会阻塞 LLM stream 线程读下一个 chunk。
# 把推送丢给独立 worker 线程消费 queue回调本身只做内存追加立即返回。
# 队列里只放"最新累积文本"的快照worker 用 deduplication 避免重复推同一个
# 内容(高频 chunk 场景下队列会堆积,只推最后一个就够了)。
import queue as _queue
push_queue: "_queue.Queue[str | None]" = _queue.Queue()
def _push_worker():
while True:
snapshot = push_queue.get()
if snapshot is None:
push_queue.task_done()
return
# 合并队列中已堆积的快照:只推最后一个,省 PUT 次数同时降低延迟
merged_count = 1
stop = False
while True:
try:
nxt = push_queue.get_nowait()
except _queue.Empty:
break
merged_count += 1
if nxt is None:
stop = True
break
snapshot = nxt
try:
_stream_update_text(snapshot)
finally:
for _ in range(merged_count):
push_queue.task_done()
if stop:
return
push_thread = threading.Thread(target=_push_worker, daemon=True, name="feishu-stream-push")
push_thread.start()
def _drain_push_queue():
"""等当前队列里所有 PUT 都完成。message_end/agent_end 在做最终定型前必须 drain
否则 worker 里堆积的旧快照可能在 final_text PUT 之后到达,把最终内容覆盖掉。"""
try:
push_queue.join()
except Exception:
pass
msg = context.get("msg")
is_group = context.get("isgroup", False)
receiver = context.get("receiver")
receive_id_type = context.get("receive_id_type", "open_id")
# 后端推流节流:首个 chunk 立即推(最低首字延迟),之后每 200ms 一波。
# 客户端按 70ms/字 渲染(约 14 字/秒)是真正的速度瓶颈,再频繁推送也只会
# 在飞书云端排队,不会让用户感知更快,但会增加一倍以上的 PUT 请求。
# 飞书 streaming_mode 豁免 10qps 限制,但带宽和 CPU 成本仍是真实开销
interval_s = 0.2
# 客户端打字机渲染参数:飞书默认 step=1约 14 字/秒)实测偏慢,
# 调成 step=2约 28 字/秒)更接近 ChatGPT 等同类产品的节奏。
print_freq_ms = 70
print_step = 2
# 客户端打字机渲染参数(飞书 App 侧实际"出字"速度):
# - print_freq_ms每次刷新的间隔
# - print_step每次刷新出多少个字符
# 当前 40ms × 4 字 ≈ 100 字/秒,接近 ChatGPT/DeepSeek 网页端的节奏
print_freq_ms = 40
print_step = 4
print_strategy = "fast"
headers = {
@@ -886,33 +926,52 @@ class FeiShuChanel(ChatChannel):
except Exception as e:
logger.warning(f"[FeiShu] Stream: update text exception: {e}")
def _close_streaming_mode():
"""关闭流式模式(卡片转入"普通"状态,可被转发,摘要不再显示[生成中...]"""
def _close_streaming_mode(final_text: str = ""):
"""关闭流式模式(卡片转入"普通"状态,可被转发)。
同时通过整卡更新接口把 summary 改成最终内容的预览,否则飞书会话列表
会一直显示创建卡片时的占位摘要("[正在生成回复...]")。
"""
with lock:
cid = card_id[0]
if not cid:
return
url = (
f"https://open.feishu.cn/open-apis/cardkit/v1/cards/"
f"{cid}/settings"
)
settings_payload = json.dumps(
{"config": {"streaming_mode": False}}, ensure_ascii=False
)
body = {
"settings": settings_payload,
# 1) 通过整卡更新接口把 streaming_mode 关掉,并改写 summary
# settings 接口的 config 不接受 summary 字段,会报 code=2200
preview_src = (final_text or "").strip().replace("\n", " ")
preview = preview_src[:30] if preview_src else ""
full_card = {
"schema": "2.0",
"config": {
"streaming_mode": False,
"summary": {"content": preview or " "},
},
"body": {
"elements": [
{
"tag": "markdown",
"content": final_text or " ",
"element_id": ELEMENT_ID,
}
],
},
}
put_url = f"https://open.feishu.cn/open-apis/cardkit/v1/cards/{cid}"
put_body = {
"card": {"type": "card_json", "data": json.dumps(full_card, ensure_ascii=False)},
"sequence": _next_sequence(),
}
try:
res = requests.patch(url, headers=headers, json=body, timeout=(5, 10))
res = requests.put(put_url, headers=headers, json=put_body, timeout=(5, 10))
res_json = res.json()
if res_json.get("code") != 0:
logger.warning(
f"[FeiShu] Stream: close streaming_mode failed: {res_json}"
f"[FeiShu] Stream: finalize card (close+summary) failed: {res_json}"
)
except Exception as e:
logger.warning(
f"[FeiShu] Stream: close streaming_mode exception: {e}"
f"[FeiShu] Stream: finalize card exception: {e}"
)
def on_event(event: dict):
@@ -943,19 +1002,19 @@ class FeiShuChanel(ChatChannel):
if disabled[0]:
return
# 第二段:累加当前卡片文本,按节流推送(锁内只读写状态)
should_push = False
# 第二段:累加文本,把快照丢给 push worker 异步推送。
# 这里不能直接 requests.put否则会阻塞 LLM stream 线程读下一个 chunk
# (实测 DeepSeek 高频小 chunk 场景每个 PUT ~150ms累积起来非常卡
snapshot = ""
should_push = False
with lock:
current_text[0] += delta
now = _time.time()
if card_id[0] and (now - last_update_time[0] >= interval_s):
last_update_time[0] = now
if card_id[0]:
snapshot = current_text[0]
should_push = True
if should_push:
_stream_update_text(snapshot)
push_queue.put(snapshot)
elif event_type == "message_end":
# 一轮 LLM 输出结束。如果本轮触发了工具调用,说明当前轮的文本是
@@ -978,16 +1037,18 @@ class FeiShuChanel(ChatChannel):
if not text_to_finalize:
return
# 用最终文本覆盖当前卡片并关闭流式模式(凝固成普通卡片)
# 等异步队列里堆积的快照都推完,避免它们晚于 final 文本到达把内容覆盖掉
_drain_push_queue()
# 用最终文本覆盖当前卡片并关闭流式模式(凝固成普通卡片,
# 同时把会话列表的 summary 改成预览,不再显示"正在生成回复..."
_stream_update_text(text_to_finalize)
_close_streaming_mode()
_close_streaming_mode(text_to_finalize)
# 重置卡片状态,下一段 message_update 会触发新卡片的创建
with lock:
card_id[0] = None
message_id[0] = None
sequence[0] = 0
last_update_time[0] = 0.0
elif event_type == "agent_end":
# 最终回复:用 final_response 覆盖当前流式卡片,然后关闭流式模式。
@@ -1012,8 +1073,11 @@ class FeiShuChanel(ChatChannel):
if disabled[0]:
return
_drain_push_queue()
_stream_update_text(final_text)
_close_streaming_mode()
_close_streaming_mode(final_text)
# 通知 push worker 退出(本次回复彻底结束)
push_queue.put(None)
return on_event