diff --git a/channel/feishu/feishu_channel.py b/channel/feishu/feishu_channel.py index 11d8eb84..3cbb906c 100644 --- a/channel/feishu/feishu_channel.py +++ b/channel/feishu/feishu_channel.py @@ -714,8 +714,6 @@ class FeiShuChanel(ChatChannel): - 创建卡片实体失败(缺权限、网络等)→ 不设置 feishu_streamed 标记,让 chat_channel 走普通文本回复路径,用户收到完整回复但无打字机效果,并打 warning 日志 """ - import time as _time - # 共享状态(受 lock 保护) # 多轮 agent 模式下,每个"中间过场消息"会作为一张独立卡片发送。 # current_text 只承载当前正在流式渲染的那张卡片的内容;message_end / agent_end @@ -723,7 +721,6 @@ class FeiShuChanel(ChatChannel): current_text = [""] # 当前卡片正在累加的 LLM 输出 card_id = [None] # 当前流式卡片的实体 ID(每段独立) message_id = [None] # 当前卡片发送后的消息 ID(仅日志用) - last_update_time = [0.0] # 占位发送是同步进行的,但用一个 in-flight 标记防止并发的多条 message_update # 事件各自触发一次创建+发送,导致发出多张卡片。 init_in_flight = [False] @@ -731,19 +728,62 @@ class FeiShuChanel(ChatChannel): disabled = [False] lock = threading.Lock() + # ---- 异步推送队列 ---------------------------------------------------- + # 同步 requests.put 单次 100~300ms,会阻塞 LLM stream 线程读下一个 chunk。 + # 把推送丢给独立 worker 线程消费 queue,回调本身只做内存追加,立即返回。 + # 队列里只放"最新累积文本"的快照;worker 用 deduplication 避免重复推同一个 + # 内容(高频 chunk 场景下队列会堆积,只推最后一个就够了)。 + import queue as _queue + push_queue: "_queue.Queue[str | None]" = _queue.Queue() + + def _push_worker(): + while True: + snapshot = push_queue.get() + if snapshot is None: + push_queue.task_done() + return + # 合并队列中已堆积的快照:只推最后一个,省 PUT 次数同时降低延迟 + merged_count = 1 + stop = False + while True: + try: + nxt = push_queue.get_nowait() + except _queue.Empty: + break + merged_count += 1 + if nxt is None: + stop = True + break + snapshot = nxt + try: + _stream_update_text(snapshot) + finally: + for _ in range(merged_count): + push_queue.task_done() + if stop: + return + + push_thread = threading.Thread(target=_push_worker, daemon=True, name="feishu-stream-push") + push_thread.start() + + def _drain_push_queue(): + """等当前队列里所有 PUT 都完成。message_end/agent_end 在做最终定型前必须 drain, + 否则 worker 里堆积的旧快照可能在 final_text PUT 之后到达,把最终内容覆盖掉。""" + try: + push_queue.join() + except Exception: + pass + msg = context.get("msg") is_group = context.get("isgroup", False) receiver = context.get("receiver") receive_id_type = context.get("receive_id_type", "open_id") - # 后端推流节流:首个 chunk 立即推(最低首字延迟),之后每 200ms 一波。 - # 客户端按 70ms/字 渲染(约 14 字/秒)是真正的速度瓶颈,再频繁推送也只会 - # 在飞书云端排队,不会让用户感知更快,但会增加一倍以上的 PUT 请求。 - # 飞书 streaming_mode 豁免 10qps 限制,但带宽和 CPU 成本仍是真实开销。 - interval_s = 0.2 - # 客户端打字机渲染参数:飞书默认 step=1(约 14 字/秒)实测偏慢, - # 调成 step=2(约 28 字/秒)更接近 ChatGPT 等同类产品的节奏。 - print_freq_ms = 70 - print_step = 2 + # 客户端打字机渲染参数(飞书 App 侧实际"出字"速度): + # - print_freq_ms:每次刷新的间隔 + # - print_step:每次刷新出多少个字符 + # 当前 40ms × 4 字 ≈ 100 字/秒,接近 ChatGPT/DeepSeek 网页端的节奏。 + print_freq_ms = 40 + print_step = 4 print_strategy = "fast" headers = { @@ -886,33 +926,52 @@ class FeiShuChanel(ChatChannel): except Exception as e: logger.warning(f"[FeiShu] Stream: update text exception: {e}") - def _close_streaming_mode(): - """关闭流式模式(卡片转入"普通"状态,可被转发,摘要不再显示[生成中...])""" + def _close_streaming_mode(final_text: str = ""): + """关闭流式模式(卡片转入"普通"状态,可被转发)。 + + 同时通过整卡更新接口把 summary 改成最终内容的预览,否则飞书会话列表 + 会一直显示创建卡片时的占位摘要("[正在生成回复...]")。 + """ with lock: cid = card_id[0] if not cid: return - url = ( - f"https://open.feishu.cn/open-apis/cardkit/v1/cards/" - f"{cid}/settings" - ) - settings_payload = json.dumps( - {"config": {"streaming_mode": False}}, ensure_ascii=False - ) - body = { - "settings": settings_payload, + + # 1) 通过整卡更新接口把 streaming_mode 关掉,并改写 summary + # (settings 接口的 config 不接受 summary 字段,会报 code=2200) + preview_src = (final_text or "").strip().replace("\n", " ") + preview = preview_src[:30] if preview_src else "" + full_card = { + "schema": "2.0", + "config": { + "streaming_mode": False, + "summary": {"content": preview or " "}, + }, + "body": { + "elements": [ + { + "tag": "markdown", + "content": final_text or " ", + "element_id": ELEMENT_ID, + } + ], + }, + } + put_url = f"https://open.feishu.cn/open-apis/cardkit/v1/cards/{cid}" + put_body = { + "card": {"type": "card_json", "data": json.dumps(full_card, ensure_ascii=False)}, "sequence": _next_sequence(), } try: - res = requests.patch(url, headers=headers, json=body, timeout=(5, 10)) + res = requests.put(put_url, headers=headers, json=put_body, timeout=(5, 10)) res_json = res.json() if res_json.get("code") != 0: logger.warning( - f"[FeiShu] Stream: close streaming_mode failed: {res_json}" + f"[FeiShu] Stream: finalize card (close+summary) failed: {res_json}" ) except Exception as e: logger.warning( - f"[FeiShu] Stream: close streaming_mode exception: {e}" + f"[FeiShu] Stream: finalize card exception: {e}" ) def on_event(event: dict): @@ -943,19 +1002,19 @@ class FeiShuChanel(ChatChannel): if disabled[0]: return - # 第二段:累加当前卡片文本,按节流推送(锁内只读写状态) - should_push = False + # 第二段:累加文本,把快照丢给 push worker 异步推送。 + # 这里不能直接 requests.put,否则会阻塞 LLM stream 线程读下一个 chunk + # (实测 DeepSeek 高频小 chunk 场景每个 PUT ~150ms,累积起来非常卡)。 snapshot = "" + should_push = False with lock: current_text[0] += delta - now = _time.time() - if card_id[0] and (now - last_update_time[0] >= interval_s): - last_update_time[0] = now + if card_id[0]: snapshot = current_text[0] should_push = True if should_push: - _stream_update_text(snapshot) + push_queue.put(snapshot) elif event_type == "message_end": # 一轮 LLM 输出结束。如果本轮触发了工具调用,说明当前轮的文本是 @@ -978,16 +1037,18 @@ class FeiShuChanel(ChatChannel): if not text_to_finalize: return - # 用最终文本覆盖当前卡片并关闭流式模式(凝固成普通卡片) + # 等异步队列里堆积的快照都推完,避免它们晚于 final 文本到达把内容覆盖掉 + _drain_push_queue() + # 用最终文本覆盖当前卡片并关闭流式模式(凝固成普通卡片, + # 同时把会话列表的 summary 改成预览,不再显示"正在生成回复...") _stream_update_text(text_to_finalize) - _close_streaming_mode() + _close_streaming_mode(text_to_finalize) # 重置卡片状态,下一段 message_update 会触发新卡片的创建 with lock: card_id[0] = None message_id[0] = None sequence[0] = 0 - last_update_time[0] = 0.0 elif event_type == "agent_end": # 最终回复:用 final_response 覆盖当前流式卡片,然后关闭流式模式。 @@ -1012,8 +1073,11 @@ class FeiShuChanel(ChatChannel): if disabled[0]: return + _drain_push_queue() _stream_update_text(final_text) - _close_streaming_mode() + _close_streaming_mode(final_text) + # 通知 push worker 退出(本次回复彻底结束) + push_queue.put(None) return on_event