mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-06-02 00:57:41 +08:00
78 lines
3.3 KiB
Python
78 lines
3.3 KiB
Python
#####################################################################
|
|
# xunfei voice service
|
|
# Auth: njnuko
|
|
# Email: njnuko@163.com
|
|
#
|
|
# 要使用本模块, 首先到 xfyun.cn 注册一个开发者账号,
|
|
# 之后创建一个新应用, 然后在应用管理的语音识别或者语音合同右边可以查看APPID API Key 和 Secret Key
|
|
# 然后在 config.json 中填入这三个值
|
|
#####################################################################
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
|
|
from bridge.reply import Reply, ReplyType
|
|
from common.log import logger
|
|
from common.tmp_dir import TmpDir
|
|
from config import conf
|
|
from voice.voice import Voice
|
|
from .xunfei_asr import xunfei_asr
|
|
from .xunfei_tts import xunfei_tts
|
|
from voice.audio_convert import any_to_mp3
|
|
import shutil
|
|
from pydub import AudioSegment
|
|
|
|
|
|
class XunfeiVoice(Voice):
|
|
def __init__(self):
|
|
try:
|
|
curdir = os.path.dirname(__file__)
|
|
config_path = os.path.join(curdir, "config.json")
|
|
conf = None
|
|
with open(config_path, "r") as fr:
|
|
conf = json.load(fr)
|
|
print(conf)
|
|
self.APPID = str(conf.get("APPID"))
|
|
self.APIKey = str(conf.get("APIKey"))
|
|
self.APISecret = str(conf.get("APISecret"))
|
|
self.BusinessArgsTTS = conf.get("BusinessArgsTTS")
|
|
self.BusinessArgsASR= conf.get("BusinessArgsASR")
|
|
|
|
except Exception as e:
|
|
logger.warn("XunfeiVoice init failed: %s, ignore " % e)
|
|
|
|
def voiceToText(self, voice_file):
|
|
# 识别本地文件
|
|
try:
|
|
logger.debug("[Xunfei] voice file name={}".format(voice_file))
|
|
#print("voice_file===========",voice_file)
|
|
#print("voice_file_type===========",type(voice_file))
|
|
#mp3_name, file_extension = os.path.splitext(voice_file)
|
|
#mp3_file = mp3_name + ".mp3"
|
|
#pcm_data=get_pcm_from_wav(voice_file)
|
|
#mp3_name, file_extension = os.path.splitext(voice_file)
|
|
#AudioSegment.from_wav(voice_file).export(mp3_file, format="mp3")
|
|
#shutil.copy2(voice_file, 'tmp/test1.wav')
|
|
#shutil.copy2(mp3_file, 'tmp/test1.mp3')
|
|
#print("voice and mp3 file",voice_file,mp3_file)
|
|
text = xunfei_asr(self.APPID,self.APISecret,self.APIKey,self.BusinessArgsASR,voice_file)
|
|
logger.info("讯飞语音识别到了: {}".format(text))
|
|
reply = Reply(ReplyType.TEXT, text)
|
|
except Exception as e:
|
|
logger.warn("XunfeiVoice init failed: %s, ignore " % e)
|
|
reply = Reply(ReplyType.ERROR, "讯飞语音识别出错了;{0}")
|
|
return reply
|
|
|
|
def textToVoice(self, text):
|
|
try:
|
|
# Avoid the same filename under multithreading
|
|
fileName = TmpDir().path() + "reply-" + str(int(time.time())) + "-" + str(hash(text) & 0x7FFFFFFF) + ".mp3"
|
|
return_file = xunfei_tts(self.APPID,self.APIKey,self.APISecret,self.BusinessArgsTTS,text,fileName)
|
|
logger.info("[Xunfei] textToVoice text={} voice file name={}".format(text, fileName))
|
|
reply = Reply(ReplyType.VOICE, fileName)
|
|
except Exception as e:
|
|
logger.error("[Xunfei] textToVoice error={}".format(fileName))
|
|
reply = Reply(ReplyType.ERROR, "抱歉,讯飞语音合成失败")
|
|
return reply
|