feat(feishu): enhance #2791 with cardkit streaming + ASR fixes

- rewrite streaming reply to official cardkit v2.0 API (default on, auto-fallback)
- fix Whisper hallucination: bump ASR sample rate to 16k, pass language=zh
- fix lock-over-IO and tmp file cleanup from #2791
- drop deprecated feishu_bot_name; quiet unknown-key warnings
- docs: cardkit permission and feishu_stream_reply usage
This commit is contained in:
zhayujie
2026-05-05 14:15:25 +08:00
parent 5080051e39
commit b80c3fe5a8
9 changed files with 304 additions and 109 deletions

View File

@@ -507,109 +507,283 @@ class FeiShuChanel(ChatChannel):
def _make_feishu_stream_callback(self, context, access_token):
"""
Create an on_event callback for streaming text replies via Feishu message edit API.
First message_update event triggers a placeholder send to get a message_id;
subsequent events throttle-update that message in place.
agent_end does a final update and marks context["feishu_streamed"] = True
to suppress the duplicate send() call from chat_channel.
基于飞书官方"流式更新卡片"API 实现打字机回复。
流程:
1. message_update 首次到达 → POST /cardkit/v1/cards 创建带 streaming_mode 的卡片实体,
随后用 POST /im/v1/messages或 reply以 card_id 把卡片发出去
2. 后续 message_update → PUT /cardkit/v1/cards/{id}/elements/{eid}/content
传入"当前轮"的全量文本,飞书平台自动计算增量并以打字机效果上屏
(流式模式下不受 10 QPS 限制)
3. message_end一轮 LLM 输出结束,且本轮触发了工具调用)→ 把 current 累计到 committed
并加入分隔符;下一轮 message_update 又从空白开始,避免多轮内容串到一起
4. agent_end → 用 final_response 强制覆盖卡片,再 PATCH /cardkit/v1/cards/{id}/settings
关闭 streaming_mode标记 context["feishu_streamed"]=True 让 chat_channel 跳过普通 send()
前提条件:
- 机器人已开通 cardkit:card:write 权限
- 飞书客户端 7.20+
失败降级:
- 创建卡片实体失败(缺权限、网络等)→ 不设置 feishu_streamed 标记,让 chat_channel
走普通文本回复路径,用户收到完整回复但无打字机效果,并打 warning 日志
"""
import time as _time
import threading as _threading
streamed_text = [""]
message_id = [None]
# 共享状态(受 lock 保护)
committed_text = [""] # 已结束轮次的累积内容(含分隔符)
current_text = [""] # 当前轮 LLM 输出的累积内容
card_id = [None] # 创建出来的卡片实体 ID
message_id = [None] # 卡片发送后的消息 ID仅日志用
last_update_time = [0.0]
lock = _threading.Lock()
# 占位发送是同步进行的,但用一个 in-flight 标记防止并发的多条 message_update
# 事件各自触发一次创建+发送,导致发出多张卡片。
init_in_flight = [False]
# 一旦初始化失败就长期标记为 disabled本次回复不再尝试任何流式调用
disabled = [False]
lock = threading.Lock()
msg = context.get("msg")
is_group = context.get("isgroup", False)
receiver = context.get("receiver")
receive_id_type = context.get("receive_id_type", "open_id")
interval_s = conf().get("feishu_stream_interval_ms", 500) / 1000.0
# 后端推流间隔与客户端打字机渲染参数:飞书原生 streaming_config 默认值经验证
# 已能在大部分场景下取得平滑的打字机效果,无需暴露给用户配置。
interval_s = 0.3
print_freq_ms = 70
print_step = 2
print_strategy = "fast"
headers = {
"Authorization": "Bearer " + access_token,
"Content-Type": "application/json",
"Content-Type": "application/json; charset=utf-8",
}
# 卡片中富文本组件的 element_id后续所有 PUT 流式更新都打到这个组件
ELEMENT_ID = "stream_md"
# 操作序号,每次 PUT 必须严格递增(飞书要求)
sequence = [0]
def _make_card(text):
# update_multi=true is required by Feishu for PATCH updates to be visible to all recipients.
def _next_sequence():
sequence[0] += 1
return sequence[0]
def _build_card_json():
"""卡片 JSON 2.0 结构 + streaming_mode + 单 markdown 组件"""
return json.dumps({
"config": {"update_multi": True},
"elements": [
{
"tag": "div",
"text": {"tag": "lark_md", "content": text},
}
],
})
"schema": "2.0",
"config": {
"streaming_mode": True,
"summary": {"content": "[正在生成回复...]"},
"streaming_config": {
"print_frequency_ms": {"default": print_freq_ms},
"print_step": {"default": print_step},
"print_strategy": print_strategy,
},
},
"body": {
"elements": [
{
"tag": "markdown",
"content": "...",
"element_id": ELEMENT_ID,
}
],
},
# 注意JSON 2.0 不支持自定义 fallback 字段(传入会报错)。
# 客户端 < 7.20 时,飞书会自动展示"请升级客户端"占位,无需配置。
}, ensure_ascii=False)
def _send_placeholder():
can_reply = is_group and msg and hasattr(msg, 'msg_id') and msg.msg_id
placeholder_content = _make_card("...")
def _create_and_send_card():
"""同步执行:创建卡片实体 → 发送消息。任意一步失败则 disabled=True 触发降级"""
try:
if can_reply:
url = f"https://open.feishu.cn/open-apis/im/v1/messages/{msg.msg_id}/reply"
data = {"msg_type": "interactive", "content": placeholder_content}
res = requests.post(url=url, headers=headers, json=data, timeout=(5, 10))
else:
url = "https://open.feishu.cn/open-apis/im/v1/messages"
params = {"receive_id_type": receive_id_type}
data = {
"receive_id": receiver,
"msg_type": "interactive",
"content": placeholder_content,
}
res = requests.post(url=url, headers=headers, params=params, json=data, timeout=(5, 10))
res_json = res.json()
if res_json.get("code") == 0:
mid = res_json["data"]["message_id"]
message_id[0] = mid
logger.info(f"[FeiShu] Stream: placeholder sent, message_id={mid}")
else:
logger.error(f"[FeiShu] Stream: placeholder failed: {res_json}")
except Exception as e:
logger.error(f"[FeiShu] Stream: placeholder exception: {e}")
def _patch_message(text):
if not message_id[0]:
return
# PATCH updates interactive (card) messages; no edit-count limit per Feishu docs.
url = f"https://open.feishu.cn/open-apis/im/v1/messages/{message_id[0]}"
data = {"content": _make_card(text)}
try:
res = requests.patch(url=url, headers=headers, json=data, timeout=(5, 10))
# 步骤 1: 创建卡片实体
create_url = "https://open.feishu.cn/open-apis/cardkit/v1/cards"
create_body = {"type": "card_json", "data": _build_card_json()}
res = requests.post(
create_url, headers=headers, json=create_body, timeout=(5, 10)
)
res_json = res.json()
if res_json.get("code") != 0:
logger.warning(f"[FeiShu] Stream: patch failed: {res_json}")
logger.warning(
f"[FeiShu] Stream: create card failed "
f"(code={res_json.get('code')}, msg={res_json.get('msg')}). "
f"本次回复已自动降级为普通文本回复(一次性返回完整内容)。"
f"如需开启流式打字机效果与完整 Markdown 渲染,请到飞书开放平台 "
f"https://open.feishu.cn/app 给机器人开通 cardkit:card:write 权限"
f"(创建与更新卡片)并重新发布版本,同时确保飞书客户端 >= 7.20。"
)
with lock:
disabled[0] = True
return
cid = res_json["data"]["card_id"]
with lock:
card_id[0] = cid
# 步骤 2: 通过 card_id 发送消息(群聊优先用 reply单聊直接 send
content_payload = json.dumps(
{"type": "card", "data": {"card_id": cid}}, ensure_ascii=False
)
can_reply = is_group and msg and hasattr(msg, "msg_id") and msg.msg_id
if can_reply:
send_url = (
f"https://open.feishu.cn/open-apis/im/v1/messages/"
f"{msg.msg_id}/reply"
)
send_body = {"msg_type": "interactive", "content": content_payload}
send_res = requests.post(
send_url, headers=headers, json=send_body, timeout=(5, 10)
)
else:
send_url = "https://open.feishu.cn/open-apis/im/v1/messages"
params = {"receive_id_type": receive_id_type}
send_body = {
"receive_id": receiver,
"msg_type": "interactive",
"content": content_payload,
}
send_res = requests.post(
send_url, headers=headers, params=params, json=send_body,
timeout=(5, 10),
)
send_json = send_res.json()
if send_json.get("code") != 0:
logger.warning(
f"[FeiShu] Stream: send card failed: {send_json}. 降级为普通文本。"
)
with lock:
disabled[0] = True
return
mid = send_json["data"]["message_id"]
with lock:
message_id[0] = mid
logger.info(
f"[FeiShu] Stream: card created and sent, "
f"card_id={cid}, message_id={mid}"
)
except Exception as e:
logger.warning(f"[FeiShu] Stream: patch exception: {e}")
logger.warning(
f"[FeiShu] Stream: create/send card exception: {e}. 降级为普通文本。"
)
with lock:
disabled[0] = True
finally:
with lock:
init_in_flight[0] = False
def _stream_update_text(full_text):
"""PUT 流式更新文本组件。content 必须是当前组件的全量文本。"""
with lock:
cid = card_id[0]
if not cid:
return
url = (
f"https://open.feishu.cn/open-apis/cardkit/v1/cards/"
f"{cid}/elements/{ELEMENT_ID}/content"
)
body = {
"content": full_text,
"sequence": _next_sequence(),
}
try:
res = requests.put(url, headers=headers, json=body, timeout=(5, 10))
res_json = res.json()
if res_json.get("code") != 0:
logger.warning(
f"[FeiShu] Stream: update text failed: {res_json}"
)
except Exception as e:
logger.warning(f"[FeiShu] Stream: update text exception: {e}")
def _close_streaming_mode():
"""关闭流式模式(卡片转入"普通"状态,可被转发,摘要不再显示[生成中...]"""
with lock:
cid = card_id[0]
if not cid:
return
url = (
f"https://open.feishu.cn/open-apis/cardkit/v1/cards/"
f"{cid}/settings"
)
settings_payload = json.dumps(
{"config": {"streaming_mode": False}}, ensure_ascii=False
)
body = {
"settings": settings_payload,
"sequence": _next_sequence(),
}
try:
res = requests.patch(url, headers=headers, json=body, timeout=(5, 10))
res_json = res.json()
if res_json.get("code") != 0:
logger.warning(
f"[FeiShu] Stream: close streaming_mode failed: {res_json}"
)
except Exception as e:
logger.warning(
f"[FeiShu] Stream: close streaming_mode exception: {e}"
)
def on_event(event: dict):
event_type = event.get("type")
data = event.get("data", {})
# 一旦降级,本次回复不再做任何流式操作
with lock:
if disabled[0]:
return
if event_type == "message_update":
delta = data.get("delta", "")
if not delta:
return
# 第一段:判断是否需要初始化(创建卡片 + 发送)
need_init = False
with lock:
# 在锁内发送占位消息,防止并发的 message_update 事件各自触发一次占位发送,
# 导致发出多条 "..." 占位消息。
if message_id[0] is None:
_send_placeholder()
streamed_text[0] += delta
if card_id[0] is None and not init_in_flight[0]:
init_in_flight[0] = True
need_init = True
if need_init:
_create_and_send_card()
# 初始化失败已标记 disabled下次循环直接 return
with lock:
if disabled[0]:
return
# 第二段:累加当前轮文本,按节流推送(锁内只读写状态)
should_push = False
snapshot = ""
with lock:
current_text[0] += delta
now = _time.time()
if now - last_update_time[0] >= interval_s:
if card_id[0] and (now - last_update_time[0] >= interval_s):
last_update_time[0] = now
_patch_message(streamed_text[0])
snapshot = committed_text[0] + current_text[0]
should_push = True
if should_push:
_stream_update_text(snapshot)
elif event_type == "message_end":
# 一轮 LLM 输出结束。如果本轮触发了工具调用,把当前轮内容定型到 committed
# 并加分隔符否则当前轮就是最终内容agent_end 会处理)。
tool_calls = data.get("tool_calls", []) or []
if tool_calls:
with lock:
if current_text[0].strip():
committed_text[0] += current_text[0].rstrip() + "\n\n---\n\n"
current_text[0] = ""
elif event_type == "agent_end":
# 用 final_response 强制覆盖整张卡片:丢弃中间累积,避免拼接错误。
final_response = data.get("final_response", "")
with lock:
if message_id[0] and final_response:
_patch_message(str(final_response))
context["feishu_streamed"] = True
if final_response:
final_text = str(final_response)
# 标记 streamed 让 chat_channel 跳过 send()
context["feishu_streamed"] = True
_stream_update_text(final_text)
_close_streaming_mode()
return on_event
@@ -864,12 +1038,6 @@ class FeiShuChanel(ChatChannel):
f"[FeiShu] upload audio response, status={upload_response.status_code}, res={upload_response.content}")
response_data = upload_response.json()
if response_data.get("code") == 0:
# 若进行了格式转换,上传成功后清理临时 opus 文件,避免磁盘堆积
if upload_path != audio_path and os.path.exists(upload_path):
try:
os.remove(upload_path)
except Exception as e:
logger.warning(f"[FeiShu] Failed to remove temp opus file {upload_path}: {e}")
return response_data.get("data").get("file_key")
else:
logger.error(f"[FeiShu] upload audio failed: {response_data}")
@@ -877,6 +1045,13 @@ class FeiShuChanel(ChatChannel):
except Exception as e:
logger.error(f"[FeiShu] upload audio exception: {e}")
return None
finally:
# 无论上传成功与否都清理转换产生的临时 opus 文件,避免失败路径下磁盘堆积。
if upload_path != audio_path and os.path.exists(upload_path):
try:
os.remove(upload_path)
except Exception as e:
logger.warning(f"[FeiShu] Failed to remove temp opus file {upload_path}: {e}")
def _upload_file_url(self, file_url, access_token):
"""