diff --git a/channel/feishu/feishu_channel.py b/channel/feishu/feishu_channel.py index 52db9853..e9be8a3a 100644 --- a/channel/feishu/feishu_channel.py +++ b/channel/feishu/feishu_channel.py @@ -61,6 +61,10 @@ class FeiShuChanel(ChatChannel): feishu_app_secret = conf().get('feishu_app_secret') feishu_token = conf().get('feishu_token') feishu_event_mode = conf().get('feishu_event_mode', 'websocket') # webhook 或 websocket + # 覆盖父类默认值 [ReplyType.VOICE, ReplyType.IMAGE]。 + # 飞书原生支持发送音频(opus 格式,通过文件上传接口)和图片, + # 所有回复类型均已处理,置为空列表以启用语音和图片回复。 + NOT_SUPPORT_REPLYTYPE = [] def __init__(self): super().__init__() @@ -384,10 +388,20 @@ class FeiShuChanel(ChatChannel): no_need_at=True ) if context: + # 流式回复模式:向 context 注入 on_event 回调,agent 每产出一段文字时会调用它。 + # 回调内部先发送一条占位消息获取 message_id,之后通过 PATCH 接口原地更新内容, + # 实现打字机效果。回调结束时设置 context["feishu_streamed"]=True, + # 让 send() 跳过重复发送,避免最终完整回复再被重复投递一次。 + if conf().get("feishu_stream_reply", False): + context["on_event"] = self._make_feishu_stream_callback(context, feishu_msg.access_token) self.produce(context) logger.debug(f"[FeiShu] query={feishu_msg.content}, type={feishu_msg.ctype}") def send(self, reply: Reply, context: Context): + # 如果文本回复已通过流式传输发送,则跳过重复发送 + if reply.type == ReplyType.TEXT and context.get("feishu_streamed"): + logger.debug("[FeiShu] streaming already delivered text reply, skipping send()") + return msg = context.get("msg") is_group = context["isgroup"] if msg: @@ -450,6 +464,16 @@ class FeiShuChanel(ChatChannel): msg_type = "file" content_key = "file_key" + elif reply.type == ReplyType.VOICE: + # 语音回复:上传音频文件到飞书,然后发送 audio 类型消息 + file_key = self._upload_audio(reply.content, access_token) + if not file_key: + logger.warning("[FeiShu] upload audio failed") + return + reply_content = file_key + msg_type = "audio" + content_key = "file_key" + # Check if we can reply to an existing message (need msg_id) can_reply = is_group and msg and hasattr(msg, 'msg_id') and msg.msg_id @@ -481,6 +505,114 @@ class FeiShuChanel(ChatChannel): else: logger.error(f"[FeiShu] send message failed, code={res.get('code')}, msg={res.get('msg')}") + def _make_feishu_stream_callback(self, context, access_token): + """ + Create an on_event callback for streaming text replies via Feishu message edit API. + First message_update event triggers a placeholder send to get a message_id; + subsequent events throttle-update that message in place. + agent_end does a final update and marks context["feishu_streamed"] = True + to suppress the duplicate send() call from chat_channel. + """ + import time as _time + import threading as _threading + + streamed_text = [""] + message_id = [None] + last_update_time = [0.0] + lock = _threading.Lock() + + msg = context.get("msg") + is_group = context.get("isgroup", False) + receiver = context.get("receiver") + receive_id_type = context.get("receive_id_type", "open_id") + interval_s = conf().get("feishu_stream_interval_ms", 500) / 1000.0 + + headers = { + "Authorization": "Bearer " + access_token, + "Content-Type": "application/json", + } + + def _make_card(text): + # update_multi=true is required by Feishu for PATCH updates to be visible to all recipients. + return json.dumps({ + "config": {"update_multi": True}, + "elements": [ + { + "tag": "div", + "text": {"tag": "lark_md", "content": text}, + } + ], + }) + + def _send_placeholder(): + can_reply = is_group and msg and hasattr(msg, 'msg_id') and msg.msg_id + placeholder_content = _make_card("...") + try: + if can_reply: + url = f"https://open.feishu.cn/open-apis/im/v1/messages/{msg.msg_id}/reply" + data = {"msg_type": "interactive", "content": placeholder_content} + res = requests.post(url=url, headers=headers, json=data, timeout=(5, 10)) + else: + url = "https://open.feishu.cn/open-apis/im/v1/messages" + params = {"receive_id_type": receive_id_type} + data = { + "receive_id": receiver, + "msg_type": "interactive", + "content": placeholder_content, + } + res = requests.post(url=url, headers=headers, params=params, json=data, timeout=(5, 10)) + res_json = res.json() + if res_json.get("code") == 0: + mid = res_json["data"]["message_id"] + message_id[0] = mid + logger.info(f"[FeiShu] Stream: placeholder sent, message_id={mid}") + else: + logger.error(f"[FeiShu] Stream: placeholder failed: {res_json}") + except Exception as e: + logger.error(f"[FeiShu] Stream: placeholder exception: {e}") + + def _patch_message(text): + if not message_id[0]: + return + # PATCH updates interactive (card) messages; no edit-count limit per Feishu docs. + url = f"https://open.feishu.cn/open-apis/im/v1/messages/{message_id[0]}" + data = {"content": _make_card(text)} + try: + res = requests.patch(url=url, headers=headers, json=data, timeout=(5, 10)) + res_json = res.json() + if res_json.get("code") != 0: + logger.warning(f"[FeiShu] Stream: patch failed: {res_json}") + except Exception as e: + logger.warning(f"[FeiShu] Stream: patch exception: {e}") + + def on_event(event: dict): + event_type = event.get("type") + data = event.get("data", {}) + + if event_type == "message_update": + delta = data.get("delta", "") + if not delta: + return + with lock: + # 在锁内发送占位消息,防止并发的 message_update 事件各自触发一次占位发送, + # 导致发出多条 "..." 占位消息。 + if message_id[0] is None: + _send_placeholder() + streamed_text[0] += delta + now = _time.time() + if now - last_update_time[0] >= interval_s: + last_update_time[0] = now + _patch_message(streamed_text[0]) + + elif event_type == "agent_end": + final_response = data.get("final_response", "") + with lock: + if message_id[0] and final_response: + _patch_message(str(final_response)) + context["feishu_streamed"] = True + + return on_event + def fetch_access_token(self) -> str: url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/" headers = { @@ -687,6 +819,65 @@ class FeiShuChanel(ChatChannel): except Exception as e: logger.warning(f"[FeiShu] Failed to remove temp file {temp_file}: {e}") + def _upload_audio(self, audio_path, access_token): + """ + Upload a local audio file to Feishu and return file_key. + audio_path is a plain local file path (no file:// prefix). + Feishu audio messages only support opus format; non-opus files are converted first. + """ + logger.debug(f"[FeiShu] start upload audio, path={audio_path}") + + if not os.path.exists(audio_path): + logger.error(f"[FeiShu] audio file not found: {audio_path}") + return None + + # Feishu only plays audio messages in opus format. + # Convert if the TTS engine produced a different format (e.g. mp3 from OpenAI TTS). + upload_path = audio_path + if not audio_path.lower().endswith('.opus'): + opus_path = os.path.splitext(audio_path)[0] + '.opus' + try: + from pydub import AudioSegment + audio = AudioSegment.from_file(audio_path) + audio.export(opus_path, format='opus') + upload_path = opus_path + logger.info(f"[FeiShu] Converted audio to opus: {opus_path}") + except Exception as e: + logger.warning(f"[FeiShu] Failed to convert audio to opus, uploading original: {e}") + upload_path = audio_path + + file_name = os.path.splitext(os.path.basename(upload_path))[0] + '.opus' + upload_url = "https://open.feishu.cn/open-apis/im/v1/files" + data = {'file_type': 'opus', 'file_name': file_name} + headers = {'Authorization': f'Bearer {access_token}'} + + try: + with open(upload_path, "rb") as f: + upload_response = requests.post( + upload_url, + files={"file": f}, + data=data, + headers=headers, + timeout=(5, 30) + ) + logger.info( + f"[FeiShu] upload audio response, status={upload_response.status_code}, res={upload_response.content}") + response_data = upload_response.json() + if response_data.get("code") == 0: + # 若进行了格式转换,上传成功后清理临时 opus 文件,避免磁盘堆积 + if upload_path != audio_path and os.path.exists(upload_path): + try: + os.remove(upload_path) + except Exception as e: + logger.warning(f"[FeiShu] Failed to remove temp opus file {upload_path}: {e}") + return response_data.get("data").get("file_key") + else: + logger.error(f"[FeiShu] upload audio failed: {response_data}") + return None + except Exception as e: + logger.error(f"[FeiShu] upload audio exception: {e}") + return None + def _upload_file_url(self, file_url, access_token): """ Upload file to Feishu diff --git a/channel/feishu/feishu_message.py b/channel/feishu/feishu_message.py index 15cfb175..9acdef2d 100644 --- a/channel/feishu/feishu_message.py +++ b/channel/feishu/feishu_message.py @@ -162,6 +162,38 @@ class FeishuMessage(ChatMessage): else: logger.info(f"[FeiShu] Failed to download file, key={file_key}, res={response.text}") self._prepare_fn = _download_file + elif msg_type == "audio": + # 飞书用户发送的语音消息类型为 "audio",文件为 opus 编码格式。 + # 映射为 ContextType.VOICE,交由 chat_channel 的语音转文字(STT)流程处理。 + # 文件通过 _prepare_fn 延迟下载,在 chat_channel 调用 cmsg.prepare() 时才执行。 + self.ctype = ContextType.VOICE + content = json.loads(msg.get("content")) + file_key = content.get("file_key") + + self.content = TmpDir().path() + file_key + ".opus" + logger.info(f"[FeiShu] audio message: file_key={file_key}, save_path={self.content}") + + def _download_audio(): + logger.info(f"[FeiShu] downloading audio: file_key={file_key}, msg_id={self.msg_id}") + url = f"https://open.feishu.cn/open-apis/im/v1/messages/{self.msg_id}/resources/{file_key}" + headers = { + "Authorization": "Bearer " + access_token, + } + params = { + "type": "file" + } + try: + response = requests.get(url=url, headers=headers, params=params) + logger.info(f"[FeiShu] download audio response: status={response.status_code}, size={len(response.content)} bytes") + if response.status_code == 200: + with open(self.content, "wb") as f: + f.write(response.content) + logger.info(f"[FeiShu] audio saved to: {self.content}") + else: + logger.error(f"[FeiShu] Failed to download audio, key={file_key}, status={response.status_code}, res={response.text}") + except Exception as e: + logger.error(f"[FeiShu] Exception downloading audio, key={file_key}: {e}", exc_info=True) + self._prepare_fn = _download_audio else: raise NotImplementedError("Unsupported message type: Type:{} ".format(msg_type)) diff --git a/config-template.json b/config-template.json index 22942793..54ddfcc5 100644 --- a/config-template.json +++ b/config-template.json @@ -24,8 +24,11 @@ "linkai_app_code": "", "feishu_app_id": "", "feishu_app_secret": "", + "feishu_bot_name": "", + "feishu_stream_reply": true, + "feishu_stream_interval_ms": 100, "dingtalk_client_id": "", - "dingtalk_client_secret":"", + "dingtalk_client_secret": "", "wecom_bot_id": "", "wecom_bot_secret": "", "web_password": "", diff --git a/voice/audio_convert.py b/voice/audio_convert.py index f9420ef3..f48c19f6 100644 --- a/voice/audio_convert.py +++ b/voice/audio_convert.py @@ -73,9 +73,14 @@ def any_to_wav(any_path, wav_path): return if any_path.endswith(".sil") or any_path.endswith(".silk") or any_path.endswith(".slk"): return sil_to_wav(any_path, wav_path) - audio = AudioSegment.from_file(any_path) - audio.set_frame_rate(8000) # 百度语音转写支持8000采样率, pcm_s16le, 单通道语音识别 - audio.set_channels(1) + # pydub 0.23.0+ 会将 parameters 追加到 ffmpeg 命令的输出文件 `-` 之后, + # 因此 -nostdin 可能被当作"尾部选项"处理,是否生效取决于 ffmpeg 版本。 + # 目的是防止后台服务中 ffmpeg 子进程继承父进程的 stdin,避免死锁。 + audio = AudioSegment.from_file(any_path, parameters=["-nostdin"]) + # AudioSegment 是不可变对象:set_frame_rate/set_channels 返回新对象,不修改原对象。 + # 必须将返回值重新赋给 audio,否则修改不会生效。 + audio = audio.set_frame_rate(8000) # 百度语音转写支持8000采样率, pcm_s16le, 单通道语音识别 + audio = audio.set_channels(1) audio.export(wav_path, format="wav", codec='pcm_s16le')