python+百度语音识别+星火大模型+讯飞语音合成的语音助手
一直想做个关于大模型的工具,目前大模型有很多,综合考虑(成本、可获得性、性能)下来,选择了讯飞的星火大模型。
(ps: 已经在文末更新,最近小伙伴反馈的“运行报错问题”)
前言
这里python实现一个语音助手,利用工具:百度语音识别API,星火大模型、讯飞语音合成API,实现功能:可以语音唤醒,进行交互信息反馈。虽然比较粗糙,但是可以满足初级需要试一试手,共勉。
1 项目环境与结构
1.1 项目环境
测试时运行的环境为:Windows + Python3.10.3
测试成功运行时所安装的第三方库及其版本如下:
cffi==1.15.1
gevent==23.9.1
greenlet==2.0.2
pycparser==2.21
six==1.16.0
websocket==12.0
websocket-client==1.7.0
anyio==3.7.1
baidu-aip==4.16.11
PyAudio==0.2.14
spark-ai-python==0.3.13
nest-asyncio==1.6.0
urllib3==1.26.9
pyOpenSSL==23.2.0
1.2 项目结构
除了数据文件夹data外,一共有4个py文件,其中,
2 脚本实现
2.1 SparkApi.py
Spark_Model.py内部调用模块
coding: utf-8
&34;&34;&34;
Spark_Model.py内部调用模块
&34;&34;&34;
import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket 使用websocket_client
answer = &34;&34;
sid = &39;&39;
class Ws_Param(object):
初始化
def __init__(self, APPID, APIKey, APISecret, Spark_url):
self.APPID = APPID
self.APIKey = APIKey
self.APISecret = APISecret
self.host = urlparse(Spark_url).netloc
self.path = urlparse(Spark_url).path
self.Spark_url = Spark_url
生成url
def create_url(self):
生成RFC1123格式的时间戳
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
拼接字符串
signature_origin = &34;host: &34; + self.host + &34;\n&34;
signature_origin += &34;date: &34; + date + &34;\n&34;
signature_origin += &34;GET &34; + self.path + &34; HTTP/1.1&34;
进行hmac-sha256进行加密
signature_sha = hmac.new(self.APISecret.encode(&39;utf-8&39;), signature_origin.encode(&39;utf-8&39;),
digestmod=hashlib.sha256).digest()
signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding=&39;utf-8&39;)
authorization_origin = f&39;api_key=&34;{self.APIKey}&34;, algorithm=&34;hmac-sha256&34;, headers=&34;host date request-line&34;, signature=&34;{signature_sha_base64}&34;&39;
authorization = base64.b64encode(authorization_origin.encode(&39;utf-8&39;)).decode(encoding=&39;utf-8&39;)
将请求的鉴权参数组合为字典
v = {
&34;authorization&34;: authorization,
&34;date&34;: date,
&34;host&34;: self.host
}
拼接鉴权参数,生成url
url = self.Spark_url + &39;?&39; + urlencode(v)
print(url)
此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
return url
收到websocket错误的处理
def on_error(ws, error):
print(&34; error:&34;, error)
收到websocket关闭的处理
def on_close(ws,one,two):
print(&34; &34;)
收到websocket连接建立的处理
def on_open(ws):
thread.start_new_thread(run, (ws,))
def run(ws, *args):
data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question))
ws.send(data)
收到websocket消息的处理
def on_message(ws, message):
print(message)
print(time.time())
data = json.loads(message)
code = data[&39;header&39;][&39;code&39;]
if code != 0:
print(f&39;请求错误: {code}, {data}&39;)
ws.close()
else:
global sid
sid = data[&34;header&34;][&34;sid&34;]
choices = data[&34;payload&34;][&34;choices&34;]
status = choices[&34;status&34;]
content = choices[&34;text&34;][0][&34;content&34;]
print(content,end =&34;&34;)
global answer
answer += content
print(1)
if status == 2:
ws.close()
def gen_params(appid, domain,question):
&34;&34;&34;
通过appid和用户的提问来生成请参数
&34;&34;&34;
data = {
&34;header&34;: {
&34;app_id&34;: appid,
&34;uid&34;: &34;1234&34;
},
&34;parameter&34;: {
&34;chat&34;: {
&34;domain&34;: domain,
&34;temperature&34;: 0.8,
&34;max_tokens&34;: 2048,
&34;top_k&34;: 5,
&34;auditing&34;: &34;default&34;
}
},
&34;payload&34;: {
&34;message&34;: {
&34;text&34;: question
}
}
}
return data
def main(appid, api_key, api_secret, Spark_url,domain, question):
wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
websocket.enableTrace(False)
wsUrl = wsParam.create_url()
ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
ws.appid = appid
ws.question = question
ws.domain = domain
ws.run_forever(sslopt={&34;cert_reqs&34;: ssl.CERT_NONE})
2.2 Spark_Model.py
调用讯飞大模型,获取讯飞大模型的结果
coding: utf-8
&34;&34;&34;
调用讯飞大模型,获取讯飞大模型的结果
&34;&34;&34;
import SparkApi
以下密钥信息从控制台获取 https://console.xfyun.cn/services/bm35
appid = &34;*&34; 填写控制台中获取的 APPID 信息
api_secret = &34;*&34; 填写控制台中获取的 APISecret 信息
api_key =&34;*&34; 填写控制台中获取的 APIKey 信息
domain = &34;generalv3.5&34; v3.0版本
Spark_url = &34;wss://spark-api.xf-yun.com/v3.5/chat&34; v3.5环服务地址
初始上下文内容,当前可传system、user、assistant 等角色
text =[
{&34;role&34;: &34;system&34;, &34;content&34;: &34;你现在扮演李白,你豪情万丈,狂放不羁;接下来请用李白的口吻和用户对话。&34;} , 设置对话背景或者模型角色
{&34;role&34;: &34;user&34;, &34;content&34;: &34;你是谁&34;}, 用户的历史问题
{&34;role&34;: &34;assistant&34;, &34;content&34;: &34;.....&34;} , AI的历史回答结果
....... 省略的历史对话
{&34;role&34;: &34;user&34;, &34;content&34;: &34;你会做什么&34;} 最新的一条问题,如无需上下文,可只传最新一条问题
]
def getText(role,content):
jsoncon = {}
jsoncon[&34;role&34;] = role
jsoncon[&34;content&34;] = content
text.append(jsoncon)
return text
def getlength(text):
length = 0
for content in text:
temp = content[&34;content&34;]
leng = len(temp)
length += leng
return length
def checklen(text):
while (getlength(text) > 8000):
del text[0]
return text
def Api_Run(input):
这里是运行主函数
try:
question = checklen(getText(&34;user&34;, input))
print(question)
SparkApi.answer = &34;&34;
print(&34;星火:&34;, end=&34;&34;)
SparkApi.main(appid, api_key, api_secret, Spark_url, domain, question)
print(SparkApi.answer)
output = getText(&34;assistant&34;, SparkApi.answer)
return output[1][&39;content&39;]
except Exception as e:
print(e)
2.3 Xufi_Voice.py
调用讯飞语音合成API,实现文本转为优化后的语音输出
-*- coding:utf-8 -*-
&34;&34;&34;
调用讯飞语音合成API,实现文本转为优化后的语音输出
&34;&34;&34;
from Spark_Model import appid,api_secret,api_key
import websocket
import datetime
import hashlib
import base64
import hmac
import json
from urllib.parse import urlencode
import time
import ssl
from wsgiref.handlers import format_date_time
from datetime import datetime
from time import mktime
import _thread as thread
import os
import pyaudio
import wave
STATUS_FIRST_FRAME = 0 第一帧的标识
STATUS_CONTINUE_FRAME = 1 中间帧标识
STATUS_LAST_FRAME = 2 最后一帧的标识
url生产类
class Ws_Param(object):
初始化
def __init__(self, APPID, APIKey, APISecret, Text):
self.APPID = APPID
self.APIKey = APIKey
self.APISecret = APISecret
self.Text = Text
公共参数(common)
self.CommonArgs = {&34;app_id&34;: self.APPID}
业务参数(business),更多个性化参数可在官网查看
self.BusinessArgs = {&34;aue&34;: &34;raw&34;, &34;auf&34;: &34;audio/L16;rate=16000&34;, &34;vcn&34;: &34;x4_lingfeizhe_emo&34;, &34;tte&34;: &34;utf8&34;}
self.Data = {&34;status&34;: 2, &34;text&34;: str(base64.b64encode(self.Text.encode(&39;utf-8&39;)), &34;UTF8&34;)}
使用小语种须使用以下方式,此处的unicode指的是 utf16小端的编码方式,即&34;UTF-16LE&34;”
self.Data = {&34;status&34;: 2, &34;text&34;: str(base64.b64encode(self.Text.encode(&39;utf-16&39;)), &34;UTF8&34;)}
生成url
def create_url(self):
url = &39;wss://tts-api.xfyun.cn/v2/tts&39;
生成RFC1123格式的时间戳
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
拼接字符串
signature_origin = &34;host: &34; + &34;tts-api.xfyun.cn&34; + &34;\n&34;
signature_origin += &34;date: &34; + date + &34;\n&34;
signature_origin += &34;GET &34; + &34;/v2/tts &34; + &34;HTTP/1.1&34;
进行hmac-sha256进行加密
signature_sha = hmac.new(self.APISecret.encode(&39;utf-8&39;), signature_origin.encode(&39;utf-8&39;),
digestmod=hashlib.sha256).digest()
signature_sha = base64.b64encode(signature_sha).decode(encoding=&39;utf-8&39;)
authorization_origin = &34;api_key=\&34;%s\&34;, algorithm=\&34;%s\&34;, headers=\&34;%s\&34;, signature=\&34;%s\&34;&34; % (
self.APIKey, &34;hmac-sha256&34;, &34;host date request-line&34;, signature_sha)
authorization = base64.b64encode(authorization_origin.encode(&39;utf-8&39;)).decode(encoding=&39;utf-8&39;)
将请求的鉴权参数组合为字典
v = {
&34;authorization&34;: authorization,
&34;date&34;: date,
&34;host&34;: &34;tts-api.xfyun.cn&34;
}
拼接鉴权参数,生成url
url = url + &39;?&39; + urlencode(v)
print(&34;date: &34;,date)
print(&34;v: &34;,v)
此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
print(&39;websocket url :&39;, url)
return url
API调用类
class Make_Sound(object):
初始化
def __init__(self, output_pcm,output_wav,wsParam):
self.output_pcm = output_pcm
self.output_wav = output_wav
self.wsParam = wsParam
收到message
def on_message(self,ws, message):
try:
message =json.loads(message)
code = message[&34;code&34;]
sid = message[&34;sid&34;]
audio = message[&34;data&34;][&34;audio&34;]
audio = base64.b64decode(audio)
status = message[&34;data&34;][&34;status&34;]
print(message)
if status == 2:
print(&34;ws is closed&34;)
ws.close()
if code != 0:
errMsg = message[&34;message&34;]
print(&34;sid:%s call error:%s code is:%s&34; % (sid, errMsg, code))
else:
with open(self.output_pcm, &39;ab&39;) as f:
f.write(audio)
except Exception as e:
print(&34;receive msg,but parse exception:&34;, e)
收到websocket错误的处理
def on_error(self,ws, error):
print(&34; error:&34;, error)
收到websocket关闭的处理
def on_close(self,ws,*args):
print(&34; closed &34;)
收到websocket连接建立的处理
def on_open(self,ws):
output_pcm = self.output_pcm
wsParam = self.wsParam
def run(*args):
d = {&34;common&34;: wsParam.CommonArgs,
&34;business&34;: wsParam.BusinessArgs,
&34;data&34;: wsParam.Data,
}
d = json.dumps(d)
print(&34;------>开始发送文本数据&34;)
ws.send(d)
if os.path.exists(output_pcm):
os.remove(output_pcm)
thread.start_new_thread(run, ())
pcm转换为wav
def pcm_2_wav(self):
with open(self.output_pcm, &39;rb&39;) as pcmfile:
pcmdata = pcmfile.read()
with wave.open(self.output_wav, &39;wb&39;) as wavfile:
wavfile.setparams((1, 2, 16000, 0, &39;NONE&39;, &39;NONE&39;))
wavfile.writeframes(pcmdata)
now_time2 = time.strftime(&34;%Y-%m-%d %H:%M:%S&34;, time.gmtime())
print(&34;转wav结束时间&34;+str(now_time2))
播放wav文件
def sound_out(self):
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
wf = wave.open(self.output_wav, &39;rb&39;)
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
output=True,
frames_per_buffer=CHUNK, )
data = wf.readframes(CHUNK)
while len(data) > 0:
stream.write(data)
data = wf.readframes(CHUNK)
stream.stop_stream()
stream.close()
p.terminate()
def Run_Voice(output_pcm,output_wav,text):
这里的信息来自Spark_Model.py中 appid、api_secret、api_key
wsParam = Ws_Param(APPID=appid,APISecret=api_secret,APIKey=api_key,Text=text)
websocket.enableTrace(False)
wsUrl = wsParam.create_url()
ms = Make_Sound(output_pcm, output_wav,wsParam)
ws = websocket.WebSocketApp(wsUrl, on_message=ms.on_message, on_error=ms.on_error, on_close=ms.on_close)
ws.on_open = ms.on_open
ws.run_forever(sslopt={&34;cert_reqs&34;: ssl.CERT_NONE})
文本转为pcm
ms.pcm_2_wav()
播放wav文件
ms.sound_out()
now_time = time.strftime(&34;%Y-%m-%d %H:%M:%S&34;, time.gmtime())
print(&34;播放完毕:&34; + str(now_time))
2.4 main.py
主运行程序,调用百度语音识别API,实现语音唤醒,进行语音交互
-*- coding:utf-8 -*-
&34;&34;&34;
1.先进行语音唤醒
这里如何实现:
2.语音助手进行应答反馈
3.用户语音咨询,语音需要转为文本
4.文本告诉助手,助手调用许飞大模型
5.获取讯飞大模型的结果
6.文本转为语音反馈
&34;&34;&34;
from Xufi_Voice import Run_Voice
import Spark_Model
import win32com.client
import pyaudio
import wave
from aip import AipSpeech
import os
class Wake_Up:
def __init__(self,APP_ID,API_KEY,SECRET_KEY,file_path):
self.APP_ID = APP_ID
self.API_KEY = API_KEY
self.SECRET_KEY = SECRET_KEY
self.speaker = win32com.client.Dispatch(&34;SAPI.SpVoice&34;) window系统语音
self.file_path = file_path
def record_sound(self):
获取语音 唤醒循环,只到获得唤醒词 &34;鸭蛋鸭蛋&34;或者 &34;鸭蛋&34;为止
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
RECORD_SECONDS = 5
WAVE_OUTPUT_FILENAME = self.file_path
pau = pyaudio.PyAudio()
stream = pau.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK, )
frames = []
print(&34;请说&34;)
self.speaker.Speak(&34;请说&34;)
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK)
frames.append(data)
print(&34;好的,等待您的吩咐&34;)
self.speaker.Speak(&34;好的,已经了解您的需求,请我思考一下&34;)
stream.stop_stream()
stream.close()
pau.terminate()
wf = wave.open(WAVE_OUTPUT_FILENAME, &39;wb&39;)
wf.setnchannels(CHANNELS)
wf.setsampwidth(pau.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b&39;&39;.join(frames))
wf.close()
def voice2text(self):
语音转文本
client = AipSpeech(self.APP_ID, self.API_KEY, self.SECRET_KEY)
ret = client.asr(self.get_data(), &39;pcm&39;, 16000, {&39;dev_pid&39;: 1536}, )
print(ret)
if ret[&39;err_msg&39;] == &39;recognition error.&39;:
result = &39;&39;
return result
else:
result = ret[&39;result&39;]
return result
def get_data(self):
读取语音
with open(self.file_path, &39;rb&39;) as fp:
return fp.read()
def del_file(self):
file_name = self.file_path
try:
os.remove(file_name)
print(f&34;Successful deleted {file_name}&34;)
f = open(file_name, mode=&34;w&34;) 音频-图片-视频 mode=&34;wb&34;
f.close()
print(f&34;Successful maked {file_name}&34;)
except FileNotFoundError:
print(f&34;{file_name} not found&34;)
def Run_Talk(APP_ID,API_KEY,SECRET_KEY,file_path):
output_pcm = &39;./data/demo.pcm&39;
output_wav = &39;./data/demo.wav&39;
实例化对象
wk = Wake_Up(APP_ID,API_KEY,SECRET_KEY,file_path)
while True:
先调用录音函数
wk.record_sound()
语音转成文字的内容
chat_message = wk.voice2text()
print(chat_message)
if len(chat_message) > 0 and chat_message[0] == &39;今天&39;:
语音已唤醒
wk.del_file()
print(&39;语音唤醒完毕&39;)
Run_Voice(output_pcm, output_wav, &39;我在,请问有何吩咐&39;)
wk.speaker.Speak(&34;我在,请问有何吩咐&34;)
print(&39;我在,请问有何吩咐&39;)
wk.record_sound()
唤醒后,需求调用
chat_message = wk.voice2text()
wk.speaker.Speak(&39;好的,请稍等&39;)
Run_Voice(output_pcm, output_wav, &39;好的,请稍等&39;)
print(chat_message)
调用Spark_Xufi_Model
if len(chat_message) > 0:
Input = chat_message[0]
output = Spark_Model.Api_Run(Input)
print(output)
wk.speaker.Speak(output)
Run_Voice(output_pcm, output_wav, output)
break
else:
continue
if __name__ == &39;__main__&39;:
存放的文件名称
file_path = &34;./data/chat-audio.wav&34;
百度需要的参数
APP_ID = &39;63&39;
API_KEY = &39;mNfTy&39;
SECRET_KEY = &39;29gS&39;
Run_Talk(APP_ID,API_KEY,SECRET_KEY,file_path)
3 项目缺陷
1.大模型反馈信息太多,不精炼
2.语音识别在嘈杂环境下,会受干扰
3.不能连续语音,上下文交换,但是可以优化
4 运行问题反馈(2025-06-24)
如果运行中报错,且报错位置在Xufi_Voice.py文件中的pcm_2_wav,经分析原因是讯飞语音合成发音人失效了,这里替换为基础发音人即可,如xiaoyan