feat(feishu): add voice message support and streaming text reply

- Receive audio messages: map msg_type=audio to ContextType.VOICE and
    download opus file via lazy _prepare_fn for STT pipeline
  - Send voice replies: upload opus audio via Feishu file API, auto-convert
    non-opus formats (e.g. mp3) using pydub before upload
  - Streaming text reply: inject on_event callback into context; send a
  card
    placeholder on first delta, then PATCH-update it in-place at a
    configurable interval (feishu_stream_interval_ms) to achieve typewriter
    effect; set feishu_streamed=True to suppress duplicate send()
  - Enable NOT_SUPPORT_REPLYTYPE=[] to unblock voice and image reply types
  - Fix AudioSegment mutation bug in audio_convert.py: set_frame_rate /
    set_channels return new objects and must be reassigned
  - Add -nostdin to ffmpeg invocation to prevent stdin deadlock in daemon
  - Add feishu_bot_name, feishu_stream_reply, feishu_stream_interval_ms
    config keys to config-template.json
This commit is contained in:
ooaaooaa123
2026-04-30 16:14:57 +08:00
parent c9c99de3d9
commit 4d1f059c0d
4 changed files with 235 additions and 4 deletions

View File

@@ -61,6 +61,10 @@ class FeiShuChanel(ChatChannel):
feishu_app_secret = conf().get('feishu_app_secret')
feishu_token = conf().get('feishu_token')
feishu_event_mode = conf().get('feishu_event_mode', 'websocket') # webhook 或 websocket
# 覆盖父类默认值 [ReplyType.VOICE, ReplyType.IMAGE]。
# 飞书原生支持发送音频opus 格式,通过文件上传接口)和图片,
# 所有回复类型均已处理,置为空列表以启用语音和图片回复。
NOT_SUPPORT_REPLYTYPE = []
def __init__(self):
super().__init__()
@@ -384,10 +388,20 @@ class FeiShuChanel(ChatChannel):
no_need_at=True
)
if context:
# 流式回复模式:向 context 注入 on_event 回调agent 每产出一段文字时会调用它。
# 回调内部先发送一条占位消息获取 message_id之后通过 PATCH 接口原地更新内容,
# 实现打字机效果。回调结束时设置 context["feishu_streamed"]=True
# 让 send() 跳过重复发送,避免最终完整回复再被重复投递一次。
if conf().get("feishu_stream_reply", False):
context["on_event"] = self._make_feishu_stream_callback(context, feishu_msg.access_token)
self.produce(context)
logger.debug(f"[FeiShu] query={feishu_msg.content}, type={feishu_msg.ctype}")
def send(self, reply: Reply, context: Context):
# 如果文本回复已通过流式传输发送,则跳过重复发送
if reply.type == ReplyType.TEXT and context.get("feishu_streamed"):
logger.debug("[FeiShu] streaming already delivered text reply, skipping send()")
return
msg = context.get("msg")
is_group = context["isgroup"]
if msg:
@@ -450,6 +464,16 @@ class FeiShuChanel(ChatChannel):
msg_type = "file"
content_key = "file_key"
elif reply.type == ReplyType.VOICE:
# 语音回复:上传音频文件到飞书,然后发送 audio 类型消息
file_key = self._upload_audio(reply.content, access_token)
if not file_key:
logger.warning("[FeiShu] upload audio failed")
return
reply_content = file_key
msg_type = "audio"
content_key = "file_key"
# Check if we can reply to an existing message (need msg_id)
can_reply = is_group and msg and hasattr(msg, 'msg_id') and msg.msg_id
@@ -481,6 +505,114 @@ class FeiShuChanel(ChatChannel):
else:
logger.error(f"[FeiShu] send message failed, code={res.get('code')}, msg={res.get('msg')}")
def _make_feishu_stream_callback(self, context, access_token):
"""
Create an on_event callback for streaming text replies via Feishu message edit API.
First message_update event triggers a placeholder send to get a message_id;
subsequent events throttle-update that message in place.
agent_end does a final update and marks context["feishu_streamed"] = True
to suppress the duplicate send() call from chat_channel.
"""
import time as _time
import threading as _threading
streamed_text = [""]
message_id = [None]
last_update_time = [0.0]
lock = _threading.Lock()
msg = context.get("msg")
is_group = context.get("isgroup", False)
receiver = context.get("receiver")
receive_id_type = context.get("receive_id_type", "open_id")
interval_s = conf().get("feishu_stream_interval_ms", 500) / 1000.0
headers = {
"Authorization": "Bearer " + access_token,
"Content-Type": "application/json",
}
def _make_card(text):
# update_multi=true is required by Feishu for PATCH updates to be visible to all recipients.
return json.dumps({
"config": {"update_multi": True},
"elements": [
{
"tag": "div",
"text": {"tag": "lark_md", "content": text},
}
],
})
def _send_placeholder():
can_reply = is_group and msg and hasattr(msg, 'msg_id') and msg.msg_id
placeholder_content = _make_card("...")
try:
if can_reply:
url = f"https://open.feishu.cn/open-apis/im/v1/messages/{msg.msg_id}/reply"
data = {"msg_type": "interactive", "content": placeholder_content}
res = requests.post(url=url, headers=headers, json=data, timeout=(5, 10))
else:
url = "https://open.feishu.cn/open-apis/im/v1/messages"
params = {"receive_id_type": receive_id_type}
data = {
"receive_id": receiver,
"msg_type": "interactive",
"content": placeholder_content,
}
res = requests.post(url=url, headers=headers, params=params, json=data, timeout=(5, 10))
res_json = res.json()
if res_json.get("code") == 0:
mid = res_json["data"]["message_id"]
message_id[0] = mid
logger.info(f"[FeiShu] Stream: placeholder sent, message_id={mid}")
else:
logger.error(f"[FeiShu] Stream: placeholder failed: {res_json}")
except Exception as e:
logger.error(f"[FeiShu] Stream: placeholder exception: {e}")
def _patch_message(text):
if not message_id[0]:
return
# PATCH updates interactive (card) messages; no edit-count limit per Feishu docs.
url = f"https://open.feishu.cn/open-apis/im/v1/messages/{message_id[0]}"
data = {"content": _make_card(text)}
try:
res = requests.patch(url=url, headers=headers, json=data, timeout=(5, 10))
res_json = res.json()
if res_json.get("code") != 0:
logger.warning(f"[FeiShu] Stream: patch failed: {res_json}")
except Exception as e:
logger.warning(f"[FeiShu] Stream: patch exception: {e}")
def on_event(event: dict):
event_type = event.get("type")
data = event.get("data", {})
if event_type == "message_update":
delta = data.get("delta", "")
if not delta:
return
with lock:
# 在锁内发送占位消息,防止并发的 message_update 事件各自触发一次占位发送,
# 导致发出多条 "..." 占位消息。
if message_id[0] is None:
_send_placeholder()
streamed_text[0] += delta
now = _time.time()
if now - last_update_time[0] >= interval_s:
last_update_time[0] = now
_patch_message(streamed_text[0])
elif event_type == "agent_end":
final_response = data.get("final_response", "")
with lock:
if message_id[0] and final_response:
_patch_message(str(final_response))
context["feishu_streamed"] = True
return on_event
def fetch_access_token(self) -> str:
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
headers = {
@@ -687,6 +819,65 @@ class FeiShuChanel(ChatChannel):
except Exception as e:
logger.warning(f"[FeiShu] Failed to remove temp file {temp_file}: {e}")
def _upload_audio(self, audio_path, access_token):
"""
Upload a local audio file to Feishu and return file_key.
audio_path is a plain local file path (no file:// prefix).
Feishu audio messages only support opus format; non-opus files are converted first.
"""
logger.debug(f"[FeiShu] start upload audio, path={audio_path}")
if not os.path.exists(audio_path):
logger.error(f"[FeiShu] audio file not found: {audio_path}")
return None
# Feishu only plays audio messages in opus format.
# Convert if the TTS engine produced a different format (e.g. mp3 from OpenAI TTS).
upload_path = audio_path
if not audio_path.lower().endswith('.opus'):
opus_path = os.path.splitext(audio_path)[0] + '.opus'
try:
from pydub import AudioSegment
audio = AudioSegment.from_file(audio_path)
audio.export(opus_path, format='opus')
upload_path = opus_path
logger.info(f"[FeiShu] Converted audio to opus: {opus_path}")
except Exception as e:
logger.warning(f"[FeiShu] Failed to convert audio to opus, uploading original: {e}")
upload_path = audio_path
file_name = os.path.splitext(os.path.basename(upload_path))[0] + '.opus'
upload_url = "https://open.feishu.cn/open-apis/im/v1/files"
data = {'file_type': 'opus', 'file_name': file_name}
headers = {'Authorization': f'Bearer {access_token}'}
try:
with open(upload_path, "rb") as f:
upload_response = requests.post(
upload_url,
files={"file": f},
data=data,
headers=headers,
timeout=(5, 30)
)
logger.info(
f"[FeiShu] upload audio response, status={upload_response.status_code}, res={upload_response.content}")
response_data = upload_response.json()
if response_data.get("code") == 0:
# 若进行了格式转换,上传成功后清理临时 opus 文件,避免磁盘堆积
if upload_path != audio_path and os.path.exists(upload_path):
try:
os.remove(upload_path)
except Exception as e:
logger.warning(f"[FeiShu] Failed to remove temp opus file {upload_path}: {e}")
return response_data.get("data").get("file_key")
else:
logger.error(f"[FeiShu] upload audio failed: {response_data}")
return None
except Exception as e:
logger.error(f"[FeiShu] upload audio exception: {e}")
return None
def _upload_file_url(self, file_url, access_token):
"""
Upload file to Feishu

View File

@@ -162,6 +162,38 @@ class FeishuMessage(ChatMessage):
else:
logger.info(f"[FeiShu] Failed to download file, key={file_key}, res={response.text}")
self._prepare_fn = _download_file
elif msg_type == "audio":
# 飞书用户发送的语音消息类型为 "audio",文件为 opus 编码格式。
# 映射为 ContextType.VOICE交由 chat_channel 的语音转文字STT流程处理。
# 文件通过 _prepare_fn 延迟下载,在 chat_channel 调用 cmsg.prepare() 时才执行。
self.ctype = ContextType.VOICE
content = json.loads(msg.get("content"))
file_key = content.get("file_key")
self.content = TmpDir().path() + file_key + ".opus"
logger.info(f"[FeiShu] audio message: file_key={file_key}, save_path={self.content}")
def _download_audio():
logger.info(f"[FeiShu] downloading audio: file_key={file_key}, msg_id={self.msg_id}")
url = f"https://open.feishu.cn/open-apis/im/v1/messages/{self.msg_id}/resources/{file_key}"
headers = {
"Authorization": "Bearer " + access_token,
}
params = {
"type": "file"
}
try:
response = requests.get(url=url, headers=headers, params=params)
logger.info(f"[FeiShu] download audio response: status={response.status_code}, size={len(response.content)} bytes")
if response.status_code == 200:
with open(self.content, "wb") as f:
f.write(response.content)
logger.info(f"[FeiShu] audio saved to: {self.content}")
else:
logger.error(f"[FeiShu] Failed to download audio, key={file_key}, status={response.status_code}, res={response.text}")
except Exception as e:
logger.error(f"[FeiShu] Exception downloading audio, key={file_key}: {e}", exc_info=True)
self._prepare_fn = _download_audio
else:
raise NotImplementedError("Unsupported message type: Type:{} ".format(msg_type))

View File

@@ -24,6 +24,9 @@
"linkai_app_code": "",
"feishu_app_id": "",
"feishu_app_secret": "",
"feishu_bot_name": "",
"feishu_stream_reply": true,
"feishu_stream_interval_ms": 100,
"dingtalk_client_id": "",
"dingtalk_client_secret": "",
"wecom_bot_id": "",

View File

@@ -73,9 +73,14 @@ def any_to_wav(any_path, wav_path):
return
if any_path.endswith(".sil") or any_path.endswith(".silk") or any_path.endswith(".slk"):
return sil_to_wav(any_path, wav_path)
audio = AudioSegment.from_file(any_path)
audio.set_frame_rate(8000) # 百度语音转写支持8000采样率, pcm_s16le, 单通道语音识别
audio.set_channels(1)
# pydub 0.23.0+ 会将 parameters 追加到 ffmpeg 命令的输出文件 `-` 之后,
# 因此 -nostdin 可能被当作"尾部选项"处理,是否生效取决于 ffmpeg 版本。
# 目的是防止后台服务中 ffmpeg 子进程继承父进程的 stdin避免死锁。
audio = AudioSegment.from_file(any_path, parameters=["-nostdin"])
# AudioSegment 是不可变对象set_frame_rate/set_channels 返回新对象,不修改原对象。
# 必须将返回值重新赋给 audio否则修改不会生效。
audio = audio.set_frame_rate(8000) # 百度语音转写支持8000采样率, pcm_s16le, 单通道语音识别
audio = audio.set_channels(1)
audio.export(wav_path, format="wav", codec='pcm_s16le')