Files
chatgpt-on-wechat/voice/dashscope/dashscope_voice.py

176 lines
6.6 KiB
Python

# encoding:utf-8
"""DashScope voice: qwen3-asr-flash (ASR) + qwen3-tts-flash (TTS)
via dashscope.MultiModalConversation."""
import datetime
import os
import random
from typing import Optional
import dashscope
import requests
from dashscope import MultiModalConversation
from bridge.reply import Reply, ReplyType
from common.log import logger
from config import conf
from voice import audio_convert
from voice.voice import Voice
DEFAULT_ASR_MODEL = "qwen3-asr-flash"
DEFAULT_TTS_MODEL = "qwen3-tts-flash"
DEFAULT_TTS_VOICE = "Cherry"
MAX_DURATION_SECONDS = 300
MAX_FILE_BYTES = 10 * 1024 * 1024
class DashScopeVoice(Voice):
def __init__(self):
pass
def voiceToText(self, voice_file: str):
try:
voice_file = self._ensure_compatible_format(voice_file)
try:
size = os.path.getsize(voice_file)
if size > MAX_FILE_BYTES:
logger.warning(
f"[DashScopeVoice] audio file {size}B exceeds {MAX_FILE_BYTES}B; "
f"qwen3-asr-flash may reject it"
)
except OSError:
pass
api_key = conf().get("dashscope_api_key", "")
if not api_key:
logger.error("[DashScopeVoice] dashscope_api_key is not configured")
return Reply(ReplyType.ERROR, "未配置 DashScope API key")
dashscope.api_key = api_key
model = conf().get("voice_to_text_model") or DEFAULT_ASR_MODEL
abs_path = os.path.abspath(voice_file)
file_uri = f"file://{abs_path}"
messages = [
{"role": "user", "content": [{"audio": file_uri}]},
]
response = MultiModalConversation.call(
model=model,
messages=messages,
result_format="message",
asr_options={"enable_itn": False, "enable_lid": True},
)
text = self._extract_text(response)
if text is None:
logger.error(f"[DashScopeVoice] voiceToText failed: {response}")
return Reply(ReplyType.ERROR, "我暂时还无法听清您的语音,请稍后再试吧~")
logger.info(f"[DashScopeVoice] voiceToText model={model} text={text}")
return Reply(ReplyType.TEXT, text)
except Exception as e:
logger.exception(f"[DashScopeVoice] voiceToText exception: {e}")
return Reply(ReplyType.ERROR, "我暂时还无法听清您的语音,请稍后再试吧~")
def textToVoice(self, text: str):
try:
api_key = conf().get("dashscope_api_key", "")
if not api_key:
logger.error("[DashScopeVoice] dashscope_api_key is not configured")
return Reply(ReplyType.ERROR, "未配置 DashScope API key")
dashscope.api_key = api_key
model = conf().get("text_to_voice_model") or DEFAULT_TTS_MODEL
voice = conf().get("tts_voice_id") or DEFAULT_TTS_VOICE
response = MultiModalConversation.call(
model=model,
api_key=api_key,
text=text,
voice=voice,
stream=False,
)
url = self._extract_audio_url(response)
if not url:
logger.error(f"[DashScopeVoice] textToVoice failed: {response}")
return Reply(ReplyType.ERROR, "语音合成失败")
local_path = self._download_audio(url)
if not local_path:
return Reply(ReplyType.ERROR, "语音合成失败")
logger.info(f"[DashScopeVoice] textToVoice model={model} voice={voice} file={local_path}")
return Reply(ReplyType.VOICE, local_path)
except Exception as e:
logger.exception(f"[DashScopeVoice] textToVoice exception: {e}")
return Reply(ReplyType.ERROR, "语音合成失败")
@staticmethod
def _extract_audio_url(response) -> Optional[str]:
try:
if getattr(response, "status_code", 200) != 200:
return None
audio = response.output.get("audio") if response.output else None
if isinstance(audio, dict):
return audio.get("url") or None
return getattr(audio, "url", None)
except Exception:
return None
@staticmethod
def _download_audio(url: str) -> Optional[str]:
try:
tmp_dir = os.path.join(os.getcwd(), "tmp")
os.makedirs(tmp_dir, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
ext = os.path.splitext(url.split("?", 1)[0])[1].lower() or ".wav"
if ext not in (".mp3", ".wav", ".m4a", ".aac", ".opus"):
ext = ".wav"
dst = os.path.join(tmp_dir, f"dashscope_tts_{ts}_{random.randint(0, 9999)}{ext}")
resp = requests.get(url, timeout=60)
resp.raise_for_status()
with open(dst, "wb") as f:
f.write(resp.content)
return dst
except Exception as e:
logger.error(f"[DashScopeVoice] download audio failed: {e}")
return None
@staticmethod
def _ensure_compatible_format(voice_file: str) -> str:
# qwen3-asr-flash doesn't accept AMR/SILK; mp3/wav/m4a/aac/opus pass through.
lower = voice_file.lower()
if lower.endswith(".amr") or lower.endswith(".silk") or lower.endswith(".slk"):
try:
mp3_file = os.path.splitext(voice_file)[0] + ".mp3"
audio_convert.any_to_mp3(voice_file, mp3_file)
return mp3_file
except Exception as e:
logger.warning(f"[DashScopeVoice] mp3 convert failed: {e}")
return voice_file
@staticmethod
def _extract_text(response) -> Optional[str]:
try:
if getattr(response, "status_code", 200) != 200:
return None
choices = response.output.get("choices") or []
if not choices:
return None
content = choices[0].get("message", {}).get("content")
if isinstance(content, str):
return content.strip() or None
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict) and "text" in item:
parts.append(item["text"])
elif isinstance(item, str):
parts.append(item)
text = "".join(parts).strip()
return text or None
return None
except Exception:
return None