2 Star 10 Fork 0

Kratos/语音助手:西瓜

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
语音助手.py 7.63 KB
一键复制 编辑 原始数据 按行查看 历史
Kratos 提交于 2020-02-26 23:29 . 随便发一下
import json
import re
import subprocess
import wave
import requests
from aip import AipSpeech
import numpy
import speech_recognition
import time
import warnings
import GetBaiduAudio
import Chat
warnings.filterwarnings('ignore')
# 声音阈值
SoundThreshold = 2000
# 超过阈值的端点个数阈值
OverNum = 180
# 百度AIP的配置
APP_ID = '你的APP ID'
API_KEY = '你的Key'
SECRET_KEY = '你的密钥'
client = AipSpeech(APP_ID, API_KEY, SECRET_KEY)
retry = 0
Quicker = "C:\\Program Files\\Quicker\\QuickerStarter.exe"
CommandDict = {}
ActionDict = {}
# 读取命令配置文件
def LoadCommands():
global CommandDict
with open("command.json", "r", encoding="utf-8") as commandF:
CommandDict = json.loads(commandF.read())
# print(CommandDict)
# 读取动作配置文件
def LoadActions():
global ActionDict
with open("action.json", "r", encoding="utf-8") as actionF:
ActionDict = json.loads(actionF.read())
# print(ActionDict)
# 真正录音的函数,时间单位:秒
def real_record(FileName, time):
tmp_R = speech_recognition.Recognizer()
with speech_recognition.Microphone(sample_rate=16000) as tmp_sound:
# 据说可以降噪,好像确实有效果
tmp_R.adjust_for_ambient_noise(tmp_sound)
tmp_audio = tmp_R.record(tmp_sound, duration=time)
# 缓存到本地的做法
with open(FileName, "wb") as f:
f.write(tmp_audio.get_wav_data())
# 判断是否有声音
def LoadWav(FileName):
wavfile = wave.open(FileName, "rb")
nframes = wavfile.getnframes()
# 读取完整的帧数据到str_data中,这是一个string类型的数据
str_data = wavfile.readframes(nframes)
wavfile.close()
# 将波形数据转换成数组
wave_data = numpy.fromstring(str_data, dtype=numpy.short)
# 将wave_data数组改为2列,行数自动匹配
wave_data.shape = -1, 2
# 将数组转置
n = 0
maxnum = 0
wave_data = wave_data.T
for w in wave_data[1]:
if w > maxnum:
maxnum = w
if w > SoundThreshold:
n += 1
# print("======================")
# print("最高声音阈值为:" + str(maxnum) + "/" + str(SoundThreshold))
# print("超过阈值端点数:" + str(n) + "/" + str(OverNum))
return n > OverNum
# 上传到百度AIP进行识别
def UploadBaidu(FileName):
global retry
with open(FileName, "rb") as f:
Audio = f.read()
try:
res = client.asr(Audio, "wav", 16000)
except requests.exceptions.ConnectionError:
retry += 1
if retry <= 3:
print("网络异常,进行第%s/3次重试" % retry)
return UploadBaidu(FileName)
else:
print("重试失败,请检查网络")
return ""
if not res["err_no"]:
# print("识别成功")
retry = 0
return res["result"][0]
elif res["err_no"] == 3310:
print("录音过长")
GetBaiduAudio.PlayAudio("语音包/录音过长,请限制长度.mp3")
elif res["err_no"] == 3301:
print("听不清,请重试")
elif res["err_no"] == 3302:
print("token字段校验失败")
GetBaiduAudio.PlayAudio("语音包/token字段校验失败.mp3")
elif res["err_no"] == 3303:
print("语音服务器识别失败")
GetBaiduAudio.PlayAudio("语音包/语音服务器识别失败.mp3")
elif res["err_no"] == 3304:
print("请求QPS超限,正在重试")
time.sleep(0.5)
return UploadBaidu(FileName)
elif res["err_no"] == 3307:
print("语音服务器后端识别出错")
GetBaiduAudio.PlayAudio("语音包/语音服务器识别失败.mp3")
return UploadBaidu(FileName)
elif res["err_no"] == 3312:
print("用户输入音频格式不正确")
GetBaiduAudio.PlayAudio("语音包/用户输入音频格式不正确.mp3")
else:
print("语音识别失败")
GetBaiduAudio.PlayAudio("语音包/语音识别失败.mp3")
print(res)
# 判断执行哪一条命令
def JudgeCommand(Context):
# 判断是否进入聊天模式
res = re.findall("聊天", Context)
if res:
print("进入聊天模式,每次录音时间为2秒")
ChatMode()
# 判断是否在动作表中
for action in ActionDict:
res = re.findall(action, Context)
if res:
GetBaiduAudio.PlayAudio("语音包/提示音.mp3")
RunQuickerCommand(ActionDict[action])
return True
# 判断是否在命令表中
for command in CommandDict:
res = re.findall(command, Context)
if res:
if command == "测试":
GetBaiduAudio.PlayAudio("语音包/好的,正在为你测试.mp3")
RunQuickerCommand(CommandDict[command])
return True
elif command == "打开":
param = GetParams(Context)
if param:
GetBaiduAudio.PlayAudio("语音包/好的,正在为你打开.mp3")
RunQuickerCommand(CommandDict[command], param)
return True
else:
GetBaiduAudio.PlayAudio("语音包/打开啥?重新说一遍.mp3")
return False
return False
def GetParams(context):
context = context.replace("。", "").replace(" ", "")
textlist = re.findall("(.*)打开(.*)", context)
try:
return textlist[0][1]
except IndexError:
print(context)
print(textlist)
return ""
# 执行Quicker脚本
def RunQuickerCommand(ActionId, param=""):
com_param = "runaction:" + ActionId + " " + param
subprocess.call([Quicker, com_param])
# 循环录音识别
def loopRecord():
while 1:
data = []
real_record("tmp_audio0.wav", 1.2)
AudioMem0 = wave.open("tmp_audio0.wav", "rb")
if LoadWav("tmp_audio0.wav"):
print("检测到声音,继续录制")
GetBaiduAudio.PlayAudio("语音包/提示音.mp3")
real_record("tmp_audio1.wav", 2)
print("录制完成,正在合成")
AudioMem1 = wave.open("tmp_audio1.wav", "rb")
data.append([AudioMem0.getparams(), AudioMem0.readframes(AudioMem0.getnframes())])
AudioMem0.close()
data.append([AudioMem1.getparams(), AudioMem1.readframes(AudioMem1.getnframes())])
AudioMem1.close()
with wave.open("command.wav", "wb") as AudioMem:
AudioMem.setparams(data[0][0])
AudioMem.writeframes(data[0][1])
AudioMem.writeframes(data[1][1])
print("正在连接百度AIP识别")
AipContext = UploadBaidu("command.wav")
if AipContext:
print("识别到语句:" + AipContext)
JudgeCommand(AipContext)
else:
print(AipContext)
def ChatMode():
while 1:
real_record("tmp_chat.wav", 4)
if LoadWav("tmp_chat.wav"):
GetBaiduAudio.PlayAudio("语音包/提示音.mp3")
AipContext = UploadBaidu("tmp_chat.wav")
if AipContext:
print("你:" + AipContext)
exit_res = re.findall("退出.*聊天", AipContext)
if exit_res:
break
res = Chat.RuyiChat(AipContext)
print("西瓜:" + res)
GetBaiduAudio.getBaiduAudio("tmp_get_chat.mp3", res)
GetBaiduAudio.PlayAudio("tmp_get_chat.mp3")
print("退出聊天模式")
if __name__ == '__main__':
print("不欢迎小火车使用西瓜,快点录完视频就把我删了")
LoadCommands()
LoadActions()
print("程序开始监听:")
loopRecord()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/KratosMax/voice_assistant_watermelon.git
[email protected]:KratosMax/voice_assistant_watermelon.git
KratosMax
voice_assistant_watermelon
语音助手:西瓜
master

搜索帮助