python+百度语音识别+星火大模型+讯飞语音合成的语音助手

  • 前言
  • 1 项目环境与结构
  • 1.1 项目环境
  • 1.2 项目结构
  • 2 脚本实现
  • 2.1 SparkApi.py
  • 2.2 Spark_Model.py
  • 2.3 Xufi_Voice.py
  • 3 项目缺陷
  • 4 运行问题反馈(2025-06-24)
  • ​ 一直想做个关于大模型的工具,目前大模型有很多,综合考虑(成本、可获得性、性能)下来,选择了讯飞的星火大模型。
    (ps: 已经在文末更新,最近小伙伴反馈的“运行报错问题”)

    前言

    ​ 这里python实现一个语音助手,利用工具:百度语音识别API,星火大模型、讯飞语音合成API,实现功能:可以语音唤醒,进行交互信息反馈。虽然比较粗糙,但是可以满足初级需要试一试手,共勉。

    1 项目环境与结构

    1.1 项目环境
      测试时运行的环境为:Windows + Python3.10.3
      测试成功运行时所安装的第三方库及其版本如下:
       cffi==1.15.1
       gevent==23.9.1
       greenlet==2.0.2
       pycparser==2.21
       six==1.16.0
       websocket==12.0
       websocket-client==1.7.0
       anyio==3.7.1
       baidu-aip==4.16.11
       PyAudio==0.2.14
       spark-ai-python==0.3.13
       nest-asyncio==1.6.0
       urllib3==1.26.9
       pyOpenSSL==23.2.0
    
    1.2 项目结构

    除了数据文件夹data外,一共有4个py文件,其中,

  • SparkApi.py (Spark_Model.py内部调用模块)
  • Spark_Model.py(调用讯飞大模型,获取讯飞大模型的结果)
  • Xufi_Voice.py(调用讯飞语音合成API,实现文本转为优化后的语音输出)
  • main.py(主运行程序,调用百度语音识别API,实现语音唤醒,进行语音交互)
  • 2 脚本实现

    2.1 SparkApi.py

    Spark_Model.py内部调用模块

     coding: utf-8
    &34;&34;&34;
     Spark_Model.py内部调用模块
    &34;&34;&34;
    import _thread as thread
    import base64
    import datetime
    import hashlib
    import hmac
    import json
    from urllib.parse import urlparse
    import ssl
    from datetime import datetime
    from time import mktime
    from urllib.parse import urlencode
    from wsgiref.handlers import format_date_time
    
    import websocket   使用websocket_client
    answer = &34;&34;
    sid = &39;&39;
    
    class Ws_Param(object):
         初始化
        def __init__(self, APPID, APIKey, APISecret, Spark_url):
            self.APPID = APPID
            self.APIKey = APIKey
            self.APISecret = APISecret
            self.host = urlparse(Spark_url).netloc
            self.path = urlparse(Spark_url).path
            self.Spark_url = Spark_url
    
         生成url
        def create_url(self):
             生成RFC1123格式的时间戳
            now = datetime.now()
            date = format_date_time(mktime(now.timetuple()))
    
             拼接字符串
            signature_origin = &34;host: &34; + self.host + &34;\n&34;
            signature_origin += &34;date: &34; + date + &34;\n&34;
            signature_origin += &34;GET &34; + self.path + &34; HTTP/1.1&34;
    
             进行hmac-sha256进行加密
            signature_sha = hmac.new(self.APISecret.encode(&39;utf-8&39;), signature_origin.encode(&39;utf-8&39;),
                                     digestmod=hashlib.sha256).digest()
    
            signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding=&39;utf-8&39;)
    
            authorization_origin = f&39;api_key=&34;{self.APIKey}&34;, algorithm=&34;hmac-sha256&34;, headers=&34;host date request-line&34;, signature=&34;{signature_sha_base64}&34;&39;
    
            authorization = base64.b64encode(authorization_origin.encode(&39;utf-8&39;)).decode(encoding=&39;utf-8&39;)
    
             将请求的鉴权参数组合为字典
            v = {
                &34;authorization&34;: authorization,
                &34;date&34;: date,
                &34;host&34;: self.host
            }
             拼接鉴权参数,生成url
            url = self.Spark_url + &39;?&39; + urlencode(v)
             print(url)
             此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
            return url
    
    
     收到websocket错误的处理
    def on_error(ws, error):
        print(&34; error:&34;, error)
    
    
     收到websocket关闭的处理
    def on_close(ws,one,two):
        print(&34; &34;)
    
    
     收到websocket连接建立的处理
    def on_open(ws):
        thread.start_new_thread(run, (ws,))
    
    
    def run(ws, *args):
        data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question))
        ws.send(data)
    
    
     收到websocket消息的处理
    def on_message(ws, message):
         print(message)
         print(time.time())
        data = json.loads(message)
        code = data[&39;header&39;][&39;code&39;]
        if code != 0:
            print(f&39;请求错误: {code}, {data}&39;)
            ws.close()
        else:
            global sid
            sid = data[&34;header&34;][&34;sid&34;]
            choices = data[&34;payload&34;][&34;choices&34;]
            status = choices[&34;status&34;]
            content = choices[&34;text&34;][0][&34;content&34;]
            print(content,end =&34;&34;)
            global answer
            answer += content
             print(1)
            if status == 2:
                ws.close()
    
    
    def gen_params(appid, domain,question):
        &34;&34;&34;
        通过appid和用户的提问来生成请参数
        &34;&34;&34;
        data = {
            &34;header&34;: {
                &34;app_id&34;: appid,
                &34;uid&34;: &34;1234&34;
            },
            &34;parameter&34;: {
    
                &34;chat&34;: {
                    &34;domain&34;: domain,
                    &34;temperature&34;: 0.8,
                    &34;max_tokens&34;: 2048,
                    &34;top_k&34;: 5,
    
                    &34;auditing&34;: &34;default&34;
                }
            },
            &34;payload&34;: {
                &34;message&34;: {
                    &34;text&34;: question
                }
            }
        }
        return data
    
    
    def main(appid, api_key, api_secret, Spark_url,domain, question):
        wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
        websocket.enableTrace(False)
        wsUrl = wsParam.create_url()
        ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
        ws.appid = appid
        ws.question = question
        ws.domain = domain
        ws.run_forever(sslopt={&34;cert_reqs&34;: ssl.CERT_NONE})
    
    2.2 Spark_Model.py

    调用讯飞大模型,获取讯飞大模型的结果

     coding: utf-8
    &34;&34;&34;
     调用讯飞大模型,获取讯飞大模型的结果
    &34;&34;&34;
    import SparkApi
    
     以下密钥信息从控制台获取   https://console.xfyun.cn/services/bm35
    appid = &34;*&34;      填写控制台中获取的 APPID 信息
    api_secret = &34;*&34;    填写控制台中获取的 APISecret 信息
    api_key =&34;*&34;     填写控制台中获取的 APIKey 信息
    
    domain = &34;generalv3.5&34;     v3.0版本
    
    Spark_url = &34;wss://spark-api.xf-yun.com/v3.5/chat&34;   v3.5环服务地址
    
    初始上下文内容,当前可传system、user、assistant 等角色
    text =[
         {&34;role&34;: &34;system&34;, &34;content&34;: &34;你现在扮演李白,你豪情万丈,狂放不羁;接下来请用李白的口吻和用户对话。&34;} ,  设置对话背景或者模型角色
         {&34;role&34;: &34;user&34;, &34;content&34;: &34;你是谁&34;},   用户的历史问题
         {&34;role&34;: &34;assistant&34;, &34;content&34;: &34;.....&34;} ,  AI的历史回答结果
          ....... 省略的历史对话
         {&34;role&34;: &34;user&34;, &34;content&34;: &34;你会做什么&34;}   最新的一条问题,如无需上下文,可只传最新一条问题
    ]
    
    
    def getText(role,content):
        jsoncon = {}
        jsoncon[&34;role&34;] = role
        jsoncon[&34;content&34;] = content
        text.append(jsoncon)
        return text
    
    def getlength(text):
        length = 0
        for content in text:
            temp = content[&34;content&34;]
            leng = len(temp)
            length += leng
        return length
    
    def checklen(text):
        while (getlength(text) > 8000):
            del text[0]
        return text
        
    
    def Api_Run(input):
         这里是运行主函数
        try:
            question = checklen(getText(&34;user&34;, input))
            print(question)
            SparkApi.answer = &34;&34;
            print(&34;星火:&34;, end=&34;&34;)
            SparkApi.main(appid, api_key, api_secret, Spark_url, domain, question)
             print(SparkApi.answer)
            output = getText(&34;assistant&34;, SparkApi.answer)
            return output[1][&39;content&39;]
        except Exception as e:
            print(e)
    
    2.3 Xufi_Voice.py

    调用讯飞语音合成API,实现文本转为优化后的语音输出

     -*- coding:utf-8 -*-
    &34;&34;&34;
     调用讯飞语音合成API,实现文本转为优化后的语音输出
    &34;&34;&34;
    from Spark_Model import appid,api_secret,api_key
    import websocket
    import datetime
    import hashlib
    import base64
    import hmac
    import json
    from urllib.parse import urlencode
    import time
    import ssl
    from wsgiref.handlers import format_date_time
    from datetime import datetime
    from time import mktime
    import _thread as thread
    import os
    import pyaudio
    import wave
    
    
    STATUS_FIRST_FRAME = 0   第一帧的标识
    STATUS_CONTINUE_FRAME = 1   中间帧标识
    STATUS_LAST_FRAME = 2   最后一帧的标识
    
     url生产类
    class Ws_Param(object):
         初始化
        def __init__(self, APPID, APIKey, APISecret, Text):
            self.APPID = APPID
            self.APIKey = APIKey
            self.APISecret = APISecret
            self.Text = Text
    
             公共参数(common)
            self.CommonArgs = {&34;app_id&34;: self.APPID}
             业务参数(business),更多个性化参数可在官网查看
            self.BusinessArgs = {&34;aue&34;: &34;raw&34;, &34;auf&34;: &34;audio/L16;rate=16000&34;, &34;vcn&34;: &34;x4_lingfeizhe_emo&34;, &34;tte&34;: &34;utf8&34;}
            self.Data = {&34;status&34;: 2, &34;text&34;: str(base64.b64encode(self.Text.encode(&39;utf-8&39;)), &34;UTF8&34;)}
            使用小语种须使用以下方式,此处的unicode指的是 utf16小端的编码方式,即&34;UTF-16LE&34;”
            self.Data = {&34;status&34;: 2, &34;text&34;: str(base64.b64encode(self.Text.encode(&39;utf-16&39;)), &34;UTF8&34;)}
    
         生成url
        def create_url(self):
            url = &39;wss://tts-api.xfyun.cn/v2/tts&39;
             生成RFC1123格式的时间戳
            now = datetime.now()
            date = format_date_time(mktime(now.timetuple()))
    
             拼接字符串
            signature_origin = &34;host: &34; + &34;tts-api.xfyun.cn&34; + &34;\n&34;
            signature_origin += &34;date: &34; + date + &34;\n&34;
            signature_origin += &34;GET &34; + &34;/v2/tts &34; + &34;HTTP/1.1&34;
             进行hmac-sha256进行加密
            signature_sha = hmac.new(self.APISecret.encode(&39;utf-8&39;), signature_origin.encode(&39;utf-8&39;),
                                     digestmod=hashlib.sha256).digest()
            signature_sha = base64.b64encode(signature_sha).decode(encoding=&39;utf-8&39;)
    
            authorization_origin = &34;api_key=\&34;%s\&34;, algorithm=\&34;%s\&34;, headers=\&34;%s\&34;, signature=\&34;%s\&34;&34; % (
                self.APIKey, &34;hmac-sha256&34;, &34;host date request-line&34;, signature_sha)
            authorization = base64.b64encode(authorization_origin.encode(&39;utf-8&39;)).decode(encoding=&39;utf-8&39;)
             将请求的鉴权参数组合为字典
            v = {
                &34;authorization&34;: authorization,
                &34;date&34;: date,
                &34;host&34;: &34;tts-api.xfyun.cn&34;
            }
             拼接鉴权参数,生成url
            url = url + &39;?&39; + urlencode(v)
             print(&34;date: &34;,date)
             print(&34;v: &34;,v)
             此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
             print(&39;websocket url :&39;, url)
            return url
    
     API调用类
    class Make_Sound(object):
         初始化
        def __init__(self, output_pcm,output_wav,wsParam):
            self.output_pcm = output_pcm
            self.output_wav = output_wav
            self.wsParam = wsParam
    
         收到message
        def on_message(self,ws, message):
            try:
                message =json.loads(message)
                code = message[&34;code&34;]
                sid = message[&34;sid&34;]
                audio = message[&34;data&34;][&34;audio&34;]
                audio = base64.b64decode(audio)
                status = message[&34;data&34;][&34;status&34;]
                print(message)
                if status == 2:
                    print(&34;ws is closed&34;)
                    ws.close()
                if code != 0:
                    errMsg = message[&34;message&34;]
                    print(&34;sid:%s call error:%s code is:%s&34; % (sid, errMsg, code))
                else:
                    with open(self.output_pcm, &39;ab&39;) as f:
                        f.write(audio)
    
            except Exception as e:
                print(&34;receive msg,but parse exception:&34;, e)
    
    
    
         收到websocket错误的处理
        def on_error(self,ws, error):
            print(&34; error:&34;, error)
    
    
         收到websocket关闭的处理
        def on_close(self,ws,*args):
            print(&34; closed &34;)
    
    
         收到websocket连接建立的处理
        def on_open(self,ws):
            output_pcm = self.output_pcm
            wsParam = self.wsParam
            def run(*args):
                d = {&34;common&34;: wsParam.CommonArgs,
                     &34;business&34;: wsParam.BusinessArgs,
                     &34;data&34;: wsParam.Data,
                     }
                d = json.dumps(d)
                print(&34;------>开始发送文本数据&34;)
                ws.send(d)
                if os.path.exists(output_pcm):
                    os.remove(output_pcm)
    
            thread.start_new_thread(run, ())
    
    
         pcm转换为wav
        def pcm_2_wav(self):
    
            with open(self.output_pcm, &39;rb&39;) as pcmfile:
                pcmdata = pcmfile.read()
            with wave.open(self.output_wav, &39;wb&39;) as wavfile:
                wavfile.setparams((1, 2, 16000, 0, &39;NONE&39;, &39;NONE&39;))
                wavfile.writeframes(pcmdata)
            now_time2 = time.strftime(&34;%Y-%m-%d %H:%M:%S&34;, time.gmtime())
            print(&34;转wav结束时间&34;+str(now_time2))
    
         播放wav文件
        def sound_out(self):
            CHUNK = 1024
            FORMAT = pyaudio.paInt16
            CHANNELS = 1
            RATE = 16000
    
            wf = wave.open(self.output_wav, &39;rb&39;)
            p = pyaudio.PyAudio()
            stream = p.open(format=FORMAT,
                            channels=CHANNELS,
                            rate=RATE,
                            output=True,
                            frames_per_buffer=CHUNK, )
    
            data = wf.readframes(CHUNK)
    
            while len(data) > 0:
                stream.write(data)
                data = wf.readframes(CHUNK)
    
            stream.stop_stream()
            stream.close()
    
            p.terminate()
    
    
    def Run_Voice(output_pcm,output_wav,text):
         这里的信息来自Spark_Model.py中 appid、api_secret、api_key
        wsParam = Ws_Param(APPID=appid,APISecret=api_secret,APIKey=api_key,Text=text)
        websocket.enableTrace(False)
        wsUrl = wsParam.create_url()
        ms = Make_Sound(output_pcm, output_wav,wsParam)
        ws = websocket.WebSocketApp(wsUrl, on_message=ms.on_message, on_error=ms.on_error, on_close=ms.on_close)
        ws.on_open = ms.on_open
        ws.run_forever(sslopt={&34;cert_reqs&34;: ssl.CERT_NONE})
    
         文本转为pcm
        ms.pcm_2_wav()
    
         播放wav文件
        ms.sound_out()
        now_time = time.strftime(&34;%Y-%m-%d %H:%M:%S&34;, time.gmtime())
        print(&34;播放完毕:&34; + str(now_time))
    
    

    2.4 main.py

    主运行程序,调用百度语音识别API,实现语音唤醒,进行语音交互

     -*- coding:utf-8 -*-
    &34;&34;&34;
     1.先进行语音唤醒
     这里如何实现:
     2.语音助手进行应答反馈
     3.用户语音咨询,语音需要转为文本
     4.文本告诉助手,助手调用许飞大模型
     5.获取讯飞大模型的结果
     6.文本转为语音反馈
    &34;&34;&34;
    from Xufi_Voice import Run_Voice
    import Spark_Model
    import win32com.client
    import pyaudio
    import wave
    from aip import AipSpeech
    import os
    
    
    class Wake_Up:
    
        def __init__(self,APP_ID,API_KEY,SECRET_KEY,file_path):
            self.APP_ID = APP_ID
            self.API_KEY = API_KEY
            self.SECRET_KEY = SECRET_KEY
            self.speaker = win32com.client.Dispatch(&34;SAPI.SpVoice&34;)   window系统语音
            self.file_path = file_path
    
        def record_sound(self):
             获取语音 唤醒循环,只到获得唤醒词 &34;鸭蛋鸭蛋&34;或者 &34;鸭蛋&34;为止
            CHUNK = 1024
            FORMAT = pyaudio.paInt16
            CHANNELS = 1
            RATE = 16000
            RECORD_SECONDS = 5
            WAVE_OUTPUT_FILENAME = self.file_path
            pau = pyaudio.PyAudio()
            stream = pau.open(format=FORMAT,
                              channels=CHANNELS,
                              rate=RATE,
                              input=True,
                              frames_per_buffer=CHUNK, )
            frames = []
            print(&34;请说&34;)
             self.speaker.Speak(&34;请说&34;)
            for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
                data = stream.read(CHUNK)
                frames.append(data)
            print(&34;好的,等待您的吩咐&34;)
             self.speaker.Speak(&34;好的,已经了解您的需求,请我思考一下&34;)
            stream.stop_stream()
            stream.close()
            pau.terminate()
            wf = wave.open(WAVE_OUTPUT_FILENAME, &39;wb&39;)
            wf.setnchannels(CHANNELS)
            wf.setsampwidth(pau.get_sample_size(FORMAT))
            wf.setframerate(RATE)
            wf.writeframes(b&39;&39;.join(frames))
            wf.close()
    
        def voice2text(self):
             语音转文本
            client = AipSpeech(self.APP_ID, self.API_KEY, self.SECRET_KEY)
            ret = client.asr(self.get_data(), &39;pcm&39;, 16000, {&39;dev_pid&39;: 1536}, )
             print(ret)
            if ret[&39;err_msg&39;] == &39;recognition error.&39;:
                result = &39;&39;
                return result
            else:
                result = ret[&39;result&39;]
                return result
    
        def get_data(self):
             读取语音
            with open(self.file_path, &39;rb&39;) as fp:
                return fp.read()
    
        def del_file(self):
    
            file_name = self.file_path
            try:
                os.remove(file_name)
                print(f&34;Successful deleted {file_name}&34;)
                f = open(file_name, mode=&34;w&34;)   音频-图片-视频  mode=&34;wb&34;
                f.close()
                print(f&34;Successful maked {file_name}&34;)
    
            except FileNotFoundError:
                print(f&34;{file_name} not found&34;)
    
    
    def Run_Talk(APP_ID,API_KEY,SECRET_KEY,file_path):
    
        output_pcm = &39;./data/demo.pcm&39;
        output_wav = &39;./data/demo.wav&39;
         实例化对象
        wk = Wake_Up(APP_ID,API_KEY,SECRET_KEY,file_path)
    
        while True:
             先调用录音函数
            wk.record_sound()
             语音转成文字的内容
            chat_message = wk.voice2text()
            print(chat_message)
            if len(chat_message) > 0 and chat_message[0] == &39;今天&39;:
                 语音已唤醒
                wk.del_file()
                print(&39;语音唤醒完毕&39;)
                Run_Voice(output_pcm, output_wav, &39;我在,请问有何吩咐&39;)
                 wk.speaker.Speak(&34;我在,请问有何吩咐&34;)
                print(&39;我在,请问有何吩咐&39;)
                wk.record_sound()
                 唤醒后,需求调用
                chat_message = wk.voice2text()
                 wk.speaker.Speak(&39;好的,请稍等&39;)
                Run_Voice(output_pcm, output_wav, &39;好的,请稍等&39;)
                print(chat_message)
                 调用Spark_Xufi_Model
                if len(chat_message) > 0:
                    Input = chat_message[0]
                    output = Spark_Model.Api_Run(Input)
                    print(output)
                     wk.speaker.Speak(output)
                    Run_Voice(output_pcm, output_wav, output)
                break
            else:
                continue
    
    
    if __name__ == &39;__main__&39;:
         存放的文件名称
        file_path = &34;./data/chat-audio.wav&34;
         百度需要的参数
        APP_ID = &39;63&39;
        API_KEY = &39;mNfTy&39;
        SECRET_KEY = &39;29gS&39;
        Run_Talk(APP_ID,API_KEY,SECRET_KEY,file_path)
    
    

    3 项目缺陷

    1.大模型反馈信息太多,不精炼

    2.语音识别在嘈杂环境下,会受干扰

    3.不能连续语音,上下文交换,但是可以优化

    4 运行问题反馈(2025-06-24)

    如果运行中报错,且报错位置在Xufi_Voice.py文件中的pcm_2_wav,经分析原因是讯飞语音合成发音人失效了,这里替换为基础发音人即可,如xiaoyan