mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-06-02 00:57:41 +08:00
Merge pull request #2791 from ooaaooaa123/feat/feishu-voice-stream-reply
feat(feishu): 支持语音消息收发与流式打字机回复
This commit is contained in:
@@ -61,6 +61,10 @@ class FeiShuChanel(ChatChannel):
|
||||
feishu_app_secret = conf().get('feishu_app_secret')
|
||||
feishu_token = conf().get('feishu_token')
|
||||
feishu_event_mode = conf().get('feishu_event_mode', 'websocket') # webhook 或 websocket
|
||||
# 覆盖父类默认值 [ReplyType.VOICE, ReplyType.IMAGE]。
|
||||
# 飞书原生支持发送音频(opus 格式,通过文件上传接口)和图片,
|
||||
# 所有回复类型均已处理,置为空列表以启用语音和图片回复。
|
||||
NOT_SUPPORT_REPLYTYPE = []
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
@@ -384,10 +388,20 @@ class FeiShuChanel(ChatChannel):
|
||||
no_need_at=True
|
||||
)
|
||||
if context:
|
||||
# 流式回复模式:向 context 注入 on_event 回调,agent 每产出一段文字时会调用它。
|
||||
# 回调内部先发送一条占位消息获取 message_id,之后通过 PATCH 接口原地更新内容,
|
||||
# 实现打字机效果。回调结束时设置 context["feishu_streamed"]=True,
|
||||
# 让 send() 跳过重复发送,避免最终完整回复再被重复投递一次。
|
||||
if conf().get("feishu_stream_reply", False):
|
||||
context["on_event"] = self._make_feishu_stream_callback(context, feishu_msg.access_token)
|
||||
self.produce(context)
|
||||
logger.debug(f"[FeiShu] query={feishu_msg.content}, type={feishu_msg.ctype}")
|
||||
|
||||
def send(self, reply: Reply, context: Context):
|
||||
# 如果文本回复已通过流式传输发送,则跳过重复发送
|
||||
if reply.type == ReplyType.TEXT and context.get("feishu_streamed"):
|
||||
logger.debug("[FeiShu] streaming already delivered text reply, skipping send()")
|
||||
return
|
||||
msg = context.get("msg")
|
||||
is_group = context["isgroup"]
|
||||
if msg:
|
||||
@@ -450,6 +464,16 @@ class FeiShuChanel(ChatChannel):
|
||||
msg_type = "file"
|
||||
content_key = "file_key"
|
||||
|
||||
elif reply.type == ReplyType.VOICE:
|
||||
# 语音回复:上传音频文件到飞书,然后发送 audio 类型消息
|
||||
file_key = self._upload_audio(reply.content, access_token)
|
||||
if not file_key:
|
||||
logger.warning("[FeiShu] upload audio failed")
|
||||
return
|
||||
reply_content = file_key
|
||||
msg_type = "audio"
|
||||
content_key = "file_key"
|
||||
|
||||
# Check if we can reply to an existing message (need msg_id)
|
||||
can_reply = is_group and msg and hasattr(msg, 'msg_id') and msg.msg_id
|
||||
|
||||
@@ -481,6 +505,114 @@ class FeiShuChanel(ChatChannel):
|
||||
else:
|
||||
logger.error(f"[FeiShu] send message failed, code={res.get('code')}, msg={res.get('msg')}")
|
||||
|
||||
def _make_feishu_stream_callback(self, context, access_token):
|
||||
"""
|
||||
Create an on_event callback for streaming text replies via Feishu message edit API.
|
||||
First message_update event triggers a placeholder send to get a message_id;
|
||||
subsequent events throttle-update that message in place.
|
||||
agent_end does a final update and marks context["feishu_streamed"] = True
|
||||
to suppress the duplicate send() call from chat_channel.
|
||||
"""
|
||||
import time as _time
|
||||
import threading as _threading
|
||||
|
||||
streamed_text = [""]
|
||||
message_id = [None]
|
||||
last_update_time = [0.0]
|
||||
lock = _threading.Lock()
|
||||
|
||||
msg = context.get("msg")
|
||||
is_group = context.get("isgroup", False)
|
||||
receiver = context.get("receiver")
|
||||
receive_id_type = context.get("receive_id_type", "open_id")
|
||||
interval_s = conf().get("feishu_stream_interval_ms", 500) / 1000.0
|
||||
|
||||
headers = {
|
||||
"Authorization": "Bearer " + access_token,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
def _make_card(text):
|
||||
# update_multi=true is required by Feishu for PATCH updates to be visible to all recipients.
|
||||
return json.dumps({
|
||||
"config": {"update_multi": True},
|
||||
"elements": [
|
||||
{
|
||||
"tag": "div",
|
||||
"text": {"tag": "lark_md", "content": text},
|
||||
}
|
||||
],
|
||||
})
|
||||
|
||||
def _send_placeholder():
|
||||
can_reply = is_group and msg and hasattr(msg, 'msg_id') and msg.msg_id
|
||||
placeholder_content = _make_card("...")
|
||||
try:
|
||||
if can_reply:
|
||||
url = f"https://open.feishu.cn/open-apis/im/v1/messages/{msg.msg_id}/reply"
|
||||
data = {"msg_type": "interactive", "content": placeholder_content}
|
||||
res = requests.post(url=url, headers=headers, json=data, timeout=(5, 10))
|
||||
else:
|
||||
url = "https://open.feishu.cn/open-apis/im/v1/messages"
|
||||
params = {"receive_id_type": receive_id_type}
|
||||
data = {
|
||||
"receive_id": receiver,
|
||||
"msg_type": "interactive",
|
||||
"content": placeholder_content,
|
||||
}
|
||||
res = requests.post(url=url, headers=headers, params=params, json=data, timeout=(5, 10))
|
||||
res_json = res.json()
|
||||
if res_json.get("code") == 0:
|
||||
mid = res_json["data"]["message_id"]
|
||||
message_id[0] = mid
|
||||
logger.info(f"[FeiShu] Stream: placeholder sent, message_id={mid}")
|
||||
else:
|
||||
logger.error(f"[FeiShu] Stream: placeholder failed: {res_json}")
|
||||
except Exception as e:
|
||||
logger.error(f"[FeiShu] Stream: placeholder exception: {e}")
|
||||
|
||||
def _patch_message(text):
|
||||
if not message_id[0]:
|
||||
return
|
||||
# PATCH updates interactive (card) messages; no edit-count limit per Feishu docs.
|
||||
url = f"https://open.feishu.cn/open-apis/im/v1/messages/{message_id[0]}"
|
||||
data = {"content": _make_card(text)}
|
||||
try:
|
||||
res = requests.patch(url=url, headers=headers, json=data, timeout=(5, 10))
|
||||
res_json = res.json()
|
||||
if res_json.get("code") != 0:
|
||||
logger.warning(f"[FeiShu] Stream: patch failed: {res_json}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[FeiShu] Stream: patch exception: {e}")
|
||||
|
||||
def on_event(event: dict):
|
||||
event_type = event.get("type")
|
||||
data = event.get("data", {})
|
||||
|
||||
if event_type == "message_update":
|
||||
delta = data.get("delta", "")
|
||||
if not delta:
|
||||
return
|
||||
with lock:
|
||||
# 在锁内发送占位消息,防止并发的 message_update 事件各自触发一次占位发送,
|
||||
# 导致发出多条 "..." 占位消息。
|
||||
if message_id[0] is None:
|
||||
_send_placeholder()
|
||||
streamed_text[0] += delta
|
||||
now = _time.time()
|
||||
if now - last_update_time[0] >= interval_s:
|
||||
last_update_time[0] = now
|
||||
_patch_message(streamed_text[0])
|
||||
|
||||
elif event_type == "agent_end":
|
||||
final_response = data.get("final_response", "")
|
||||
with lock:
|
||||
if message_id[0] and final_response:
|
||||
_patch_message(str(final_response))
|
||||
context["feishu_streamed"] = True
|
||||
|
||||
return on_event
|
||||
|
||||
def fetch_access_token(self) -> str:
|
||||
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
|
||||
headers = {
|
||||
@@ -687,6 +819,65 @@ class FeiShuChanel(ChatChannel):
|
||||
except Exception as e:
|
||||
logger.warning(f"[FeiShu] Failed to remove temp file {temp_file}: {e}")
|
||||
|
||||
def _upload_audio(self, audio_path, access_token):
|
||||
"""
|
||||
Upload a local audio file to Feishu and return file_key.
|
||||
audio_path is a plain local file path (no file:// prefix).
|
||||
Feishu audio messages only support opus format; non-opus files are converted first.
|
||||
"""
|
||||
logger.debug(f"[FeiShu] start upload audio, path={audio_path}")
|
||||
|
||||
if not os.path.exists(audio_path):
|
||||
logger.error(f"[FeiShu] audio file not found: {audio_path}")
|
||||
return None
|
||||
|
||||
# Feishu only plays audio messages in opus format.
|
||||
# Convert if the TTS engine produced a different format (e.g. mp3 from OpenAI TTS).
|
||||
upload_path = audio_path
|
||||
if not audio_path.lower().endswith('.opus'):
|
||||
opus_path = os.path.splitext(audio_path)[0] + '.opus'
|
||||
try:
|
||||
from pydub import AudioSegment
|
||||
audio = AudioSegment.from_file(audio_path)
|
||||
audio.export(opus_path, format='opus')
|
||||
upload_path = opus_path
|
||||
logger.info(f"[FeiShu] Converted audio to opus: {opus_path}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[FeiShu] Failed to convert audio to opus, uploading original: {e}")
|
||||
upload_path = audio_path
|
||||
|
||||
file_name = os.path.splitext(os.path.basename(upload_path))[0] + '.opus'
|
||||
upload_url = "https://open.feishu.cn/open-apis/im/v1/files"
|
||||
data = {'file_type': 'opus', 'file_name': file_name}
|
||||
headers = {'Authorization': f'Bearer {access_token}'}
|
||||
|
||||
try:
|
||||
with open(upload_path, "rb") as f:
|
||||
upload_response = requests.post(
|
||||
upload_url,
|
||||
files={"file": f},
|
||||
data=data,
|
||||
headers=headers,
|
||||
timeout=(5, 30)
|
||||
)
|
||||
logger.info(
|
||||
f"[FeiShu] upload audio response, status={upload_response.status_code}, res={upload_response.content}")
|
||||
response_data = upload_response.json()
|
||||
if response_data.get("code") == 0:
|
||||
# 若进行了格式转换,上传成功后清理临时 opus 文件,避免磁盘堆积
|
||||
if upload_path != audio_path and os.path.exists(upload_path):
|
||||
try:
|
||||
os.remove(upload_path)
|
||||
except Exception as e:
|
||||
logger.warning(f"[FeiShu] Failed to remove temp opus file {upload_path}: {e}")
|
||||
return response_data.get("data").get("file_key")
|
||||
else:
|
||||
logger.error(f"[FeiShu] upload audio failed: {response_data}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"[FeiShu] upload audio exception: {e}")
|
||||
return None
|
||||
|
||||
def _upload_file_url(self, file_url, access_token):
|
||||
"""
|
||||
Upload file to Feishu
|
||||
|
||||
@@ -162,6 +162,38 @@ class FeishuMessage(ChatMessage):
|
||||
else:
|
||||
logger.info(f"[FeiShu] Failed to download file, key={file_key}, res={response.text}")
|
||||
self._prepare_fn = _download_file
|
||||
elif msg_type == "audio":
|
||||
# 飞书用户发送的语音消息类型为 "audio",文件为 opus 编码格式。
|
||||
# 映射为 ContextType.VOICE,交由 chat_channel 的语音转文字(STT)流程处理。
|
||||
# 文件通过 _prepare_fn 延迟下载,在 chat_channel 调用 cmsg.prepare() 时才执行。
|
||||
self.ctype = ContextType.VOICE
|
||||
content = json.loads(msg.get("content"))
|
||||
file_key = content.get("file_key")
|
||||
|
||||
self.content = TmpDir().path() + file_key + ".opus"
|
||||
logger.info(f"[FeiShu] audio message: file_key={file_key}, save_path={self.content}")
|
||||
|
||||
def _download_audio():
|
||||
logger.info(f"[FeiShu] downloading audio: file_key={file_key}, msg_id={self.msg_id}")
|
||||
url = f"https://open.feishu.cn/open-apis/im/v1/messages/{self.msg_id}/resources/{file_key}"
|
||||
headers = {
|
||||
"Authorization": "Bearer " + access_token,
|
||||
}
|
||||
params = {
|
||||
"type": "file"
|
||||
}
|
||||
try:
|
||||
response = requests.get(url=url, headers=headers, params=params)
|
||||
logger.info(f"[FeiShu] download audio response: status={response.status_code}, size={len(response.content)} bytes")
|
||||
if response.status_code == 200:
|
||||
with open(self.content, "wb") as f:
|
||||
f.write(response.content)
|
||||
logger.info(f"[FeiShu] audio saved to: {self.content}")
|
||||
else:
|
||||
logger.error(f"[FeiShu] Failed to download audio, key={file_key}, status={response.status_code}, res={response.text}")
|
||||
except Exception as e:
|
||||
logger.error(f"[FeiShu] Exception downloading audio, key={file_key}: {e}", exc_info=True)
|
||||
self._prepare_fn = _download_audio
|
||||
else:
|
||||
raise NotImplementedError("Unsupported message type: Type:{} ".format(msg_type))
|
||||
|
||||
|
||||
@@ -24,8 +24,10 @@
|
||||
"linkai_app_code": "",
|
||||
"feishu_app_id": "",
|
||||
"feishu_app_secret": "",
|
||||
"feishu_bot_name": "",
|
||||
"feishu_stream_reply": false,
|
||||
"dingtalk_client_id": "",
|
||||
"dingtalk_client_secret":"",
|
||||
"dingtalk_client_secret": "",
|
||||
"wecom_bot_id": "",
|
||||
"wecom_bot_secret": "",
|
||||
"web_password": "",
|
||||
|
||||
@@ -73,9 +73,14 @@ def any_to_wav(any_path, wav_path):
|
||||
return
|
||||
if any_path.endswith(".sil") or any_path.endswith(".silk") or any_path.endswith(".slk"):
|
||||
return sil_to_wav(any_path, wav_path)
|
||||
audio = AudioSegment.from_file(any_path)
|
||||
audio.set_frame_rate(8000) # 百度语音转写支持8000采样率, pcm_s16le, 单通道语音识别
|
||||
audio.set_channels(1)
|
||||
# pydub 0.23.0+ 会将 parameters 追加到 ffmpeg 命令的输出文件 `-` 之后,
|
||||
# 因此 -nostdin 可能被当作"尾部选项"处理,是否生效取决于 ffmpeg 版本。
|
||||
# 目的是防止后台服务中 ffmpeg 子进程继承父进程的 stdin,避免死锁。
|
||||
audio = AudioSegment.from_file(any_path, parameters=["-nostdin"])
|
||||
# AudioSegment 是不可变对象:set_frame_rate/set_channels 返回新对象,不修改原对象。
|
||||
# 必须将返回值重新赋给 audio,否则修改不会生效。
|
||||
audio = audio.set_frame_rate(8000) # 百度语音转写支持8000采样率, pcm_s16le, 单通道语音识别
|
||||
audio = audio.set_channels(1)
|
||||
audio.export(wav_path, format="wav", codec='pcm_s16le')
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user