ESP32-S3-CAM豆包语音识别文字后控制小车规划前面帖子里好不容易注册好了后台接口但是怎么用那个接入文档 非常细但是内容太多太复杂后来有次出差的时候想了下要不然先把官网给的python代码跑通C代码 142M确实没有勇气下载主要没有时间去细看。于是继续安装python环境版本3.12.X注意添加环境变量。安装pycharm IDE环境这些都可以问豆包或者kimi。上面安装过程就不一一介绍了网上大把常规操作python安装环境从清华镜像下载比较快pycharm从官网下载最新的。安装完以后把下面这个py文件下载下来本以为直接就能跑通。代码中这几个字段需要自己根据前面帖子里项目管理页面去复制粘贴。audio_path 就是音频本地文件在你电脑上的相对路径。appid xxx # 项目的 appid token xxx # 项目的 token cluster xxx # 请求的集群 audio_path # 本地音频路径 audio_format wav # wav 或者 mp3根据实际音频格式设置在pycharm项目目录里添加了一个资源文件其他字段根据前面注册的情况填写即可。编译运行后结果报错一堆E:\PythonProject\.venv\Scripts\python.exe E:\PythonProject\streaming_asr_demo_0.py Traceback (most recent call last): File E:\PythonProject\streaming_asr_demo_0.py, line 357, in module test_one() File E:\PythonProject\streaming_asr_demo_0.py, line 343, in test_one result execute_one( ^^^^^^^^^^^^ File E:\PythonProject\streaming_asr_demo_0.py, line 339, in execute_one result asyncio.run(asr_http_client.execute()) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File C:\Python312\Lib\asyncio\runners.py, line 195, in run return runner.run(main) ^^^^^^^^^^^^^^^^ File C:\Python312\Lib\asyncio\runners.py, line 118, in run return self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File C:\Python312\Lib\asyncio\base_events.py, line 691, in run_until_complete return future.result() ^^^^^^^^^^^^^^^ File E:\PythonProject\streaming_asr_demo_0.py, line 306, in execute with open(self.audio_path, moderb) as _f: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ FileNotFoundError: [Errno 2] No such file or directory: 丢给kimi代码修复后就可以跑了。#codingutf-8 requires Python 3.6 or later pip install asyncio pip install websockets import asyncio import base64 import gzip import hmac import json import logging import os import uuid import wave from enum import Enum from hashlib import sha256 from io import BytesIO from typing import List from urllib.parse import urlparse import time import websockets appid # 项目的 appid token # 项目的 token cluster volcengine_input_common # 请求的集群 audio_path wav/audio1.wav # 本地音频路径 audio_format wav # wav 或者 mp3根据实际音频格式设置 PROTOCOL_VERSION 0b0001 DEFAULT_HEADER_SIZE 0b0001 PROTOCOL_VERSION_BITS 4 HEADER_BITS 4 MESSAGE_TYPE_BITS 4 MESSAGE_TYPE_SPECIFIC_FLAGS_BITS 4 MESSAGE_SERIALIZATION_BITS 4 MESSAGE_COMPRESSION_BITS 4 RESERVED_BITS 8 # Message Type: CLIENT_FULL_REQUEST 0b0001 CLIENT_AUDIO_ONLY_REQUEST 0b0010 SERVER_FULL_RESPONSE 0b1001 SERVER_ACK 0b1011 SERVER_ERROR_RESPONSE 0b1111 # Message Type Specific Flags NO_SEQUENCE 0b0000 # no check sequence POS_SEQUENCE 0b0001 NEG_SEQUENCE 0b0010 NEG_SEQUENCE_1 0b0011 # Message Serialization NO_SERIALIZATION 0b0000 JSON 0b0001 THRIFT 0b0011 CUSTOM_TYPE 0b1111 # Message Compression NO_COMPRESSION 0b0000 GZIP 0b0001 CUSTOM_COMPRESSION 0b1111 def generate_header( versionPROTOCOL_VERSION, message_typeCLIENT_FULL_REQUEST, message_type_specific_flagsNO_SEQUENCE, serial_methodJSON, compression_typeGZIP, reserved_data0x00, extension_headerbytes() ): protocol_version(4 bits), header_size(4 bits), message_type(4 bits), message_type_specific_flags(4 bits) serialization_method(4 bits) message_compression(4 bits) reserved 8bits) 保留字段 header_extensions 扩展头(大小等于 8 * 4 * (header_size - 1) ) header bytearray() header_size int(len(extension_header) / 4) 1 header.append((version 4) | header_size) header.append((message_type 4) | message_type_specific_flags) header.append((serial_method 4) | compression_type) header.append(reserved_data) header.extend(extension_header) return header def generate_full_default_header(): return generate_header() def generate_audio_default_header(): return generate_header( message_typeCLIENT_AUDIO_ONLY_REQUEST ) def generate_last_audio_default_header(): return generate_header( message_typeCLIENT_AUDIO_ONLY_REQUEST, message_type_specific_flagsNEG_SEQUENCE ) def parse_response(res): protocol_version(4 bits), header_size(4 bits), message_type(4 bits), message_type_specific_flags(4 bits) serialization_method(4 bits) message_compression(4 bits) reserved 8bits) 保留字段 header_extensions 扩展头(大小等于 8 * 4 * (header_size - 1) ) payload 类似与http 请求体 protocol_version res[0] 4 header_size res[0] 0x0f message_type res[1] 4 message_type_specific_flags res[1] 0x0f serialization_method res[2] 4 message_compression res[2] 0x0f reserved res[3] header_extensions res[4:header_size * 4] payload res[header_size * 4:] result {} payload_msg None payload_size 0 if message_type SERVER_FULL_RESPONSE: payload_size int.from_bytes(payload[:4], big, signedTrue) payload_msg payload[4:] elif message_type SERVER_ACK: seq int.from_bytes(payload[:4], big, signedTrue) result[seq] seq if len(payload) 8: payload_size int.from_bytes(payload[4:8], big, signedFalse) payload_msg payload[8:] elif message_type SERVER_ERROR_RESPONSE: code int.from_bytes(payload[:4], big, signedFalse) result[code] code payload_size int.from_bytes(payload[4:8], big, signedFalse) payload_msg payload[8:] if payload_msg is None: return result if message_compression GZIP: payload_msg gzip.decompress(payload_msg) if serialization_method JSON: payload_msg json.loads(str(payload_msg, utf-8)) elif serialization_method ! NO_SERIALIZATION: payload_msg str(payload_msg, utf-8) result[payload_msg] payload_msg result[payload_size] payload_size return result def read_wav_info(data: bytes None) - (int, int, int, int, int): with BytesIO(data) as _f: wave_fp wave.open(_f, rb) nchannels, sampwidth, framerate, nframes wave_fp.getparams()[:4] wave_bytes wave_fp.readframes(nframes) return nchannels, sampwidth, framerate, nframes, len(wave_bytes) class AudioType(Enum): LOCAL 1 # 使用本地音频文件 class AsrWsClient: def __init__(self, audio_path, cluster, **kwargs): :param config: config self.audio_path audio_path self.cluster cluster self.success_code 1000 # success code, default is 1000 self.seg_duration int(kwargs.get(seg_duration, 15000)) self.nbest int(kwargs.get(nbest, 1)) self.appid kwargs.get(appid, ) self.token kwargs.get(token, ) self.ws_url kwargs.get(ws_url, wss://openspeech.bytedance.com/api/v2/asr) self.uid kwargs.get(uid, streaming_asr_demo) self.workflow kwargs.get(workflow, audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate) self.show_language kwargs.get(show_language, False) self.show_utterances kwargs.get(show_utterances, False) self.result_type kwargs.get(result_type, full) self.format kwargs.get(format, wav) self.rate kwargs.get(sample_rate, 16000) self.language kwargs.get(language, zh-CN) self.bits kwargs.get(bits, 16) self.channel kwargs.get(channel, 1) self.codec kwargs.get(codec, raw) self.audio_type kwargs.get(audio_type, AudioType.LOCAL) self.secret kwargs.get(secret, access_secret) self.auth_method kwargs.get(auth_method, token) self.mp3_seg_size int(kwargs.get(mp3_seg_size, 10000)) def construct_request(self, reqid): req { app: { appid: self.appid, cluster: self.cluster, token: self.token, }, user: { uid: self.uid }, request: { reqid: reqid, nbest: self.nbest, workflow: self.workflow, show_language: self.show_language, show_utterances: self.show_utterances, result_type: self.result_type, sequence: 1 }, audio: { format: self.format, rate: self.rate, language: self.language, bits: self.bits, channel: self.channel, codec: self.codec } } return req staticmethod def slice_data(data: bytes, chunk_size: int) - (list, bool): slice data :param data: wav data :param chunk_size: the segment size in one request :return: segment data, last flag data_len len(data) offset 0 while offset chunk_size data_len: yield data[offset: offset chunk_size], False offset chunk_size else: yield data[offset: data_len], True def _real_processor(self, request_params: dict) - dict: pass def token_auth(self): return {Authorization: Bearer; {}.format(self.token)} def signature_auth(self, data): header_dicts { Custom: auth_custom, } url_parse urlparse(self.ws_url) input_str GET {} HTTP/1.1\n.format(url_parse.path) auth_headers Custom for header in auth_headers.split(,): input_str {}\n.format(header_dicts[header]) input_data bytearray(input_str, utf-8) input_data data mac base64.urlsafe_b64encode( hmac.new(self.secret.encode(utf-8), input_data, digestmodsha256).digest()) header_dicts[Authorization] HMAC256; access_token{}; mac{}; h{}.format(self.token, str(mac, utf-8), auth_headers) return header_dicts async def segment_data_processor(self, wav_data: bytes, segment_size: int): reqid str(uuid.uuid4()) # 构建 full client request并序列化压缩 request_params self.construct_request(reqid) payload_bytes str.encode(json.dumps(request_params)) payload_bytes gzip.compress(payload_bytes) full_client_request bytearray(generate_full_default_header()) full_client_request.extend((len(payload_bytes)).to_bytes(4, big)) # payload size(4 bytes) full_client_request.extend(payload_bytes) # payload header None if self.auth_method token: header self.token_auth() elif self.auth_method signature: header self.signature_auth(full_client_request) async with websockets.connect(self.ws_url, additional_headersheader, max_size1000000000) as ws: # 发送 full client request await ws.send(full_client_request) res await ws.recv() result parse_response(res) if payload_msg in result and result[payload_msg][code] ! self.success_code: return result for seq, (chunk, last) in enumerate(AsrWsClient.slice_data(wav_data, segment_size), 1): # if no compression, comment this line payload_bytes gzip.compress(chunk) audio_only_request bytearray(generate_audio_default_header()) if last: audio_only_request bytearray(generate_last_audio_default_header()) audio_only_request.extend((len(payload_bytes)).to_bytes(4, big)) # payload size(4 bytes) audio_only_request.extend(payload_bytes) # payload # 发送 audio-only client request await ws.send(audio_only_request) res await ws.recv() result parse_response(res) if payload_msg in result and result[payload_msg][code] ! self.success_code: return result return result async def execute(self): with open(self.audio_path, moderb) as _f: data _f.read() audio_data bytes(data) if self.format mp3: segment_size self.mp3_seg_size return await self.segment_data_processor(audio_data, segment_size) if self.format ! wav: raise Exception(format should in wav or mp3) nchannels, sampwidth, framerate, nframes, wav_len read_wav_info( audio_data) size_per_sec nchannels * sampwidth * framerate segment_size int(size_per_sec * self.seg_duration / 1000) return await self.segment_data_processor(audio_data, segment_size) def execute_one(audio_item, cluster, **kwargs): :param audio_item: {id: xxx, path: xxx} :param cluster:集群名称 :return: assert id in audio_item assert path in audio_item audio_id audio_item[id] audio_path audio_item[path] audio_type AudioType.LOCAL asr_http_client AsrWsClient( audio_pathaudio_path, clustercluster, audio_typeaudio_type, **kwargs ) result asyncio.run(asr_http_client.execute()) return {id: audio_id, path: audio_path, result: result} def test_one(): result execute_one( { id: 1, path: audio_path }, clustercluster, appidappid, tokentoken, formataudio_format, ) print(result) print(ASR结果 result[result][payload_msg][result][0][text]) if __name__ __main__: test_one()最后输出的识别结果如下E:\PythonProject\.venv\Scripts\python.exe E:\PythonProject\streaming_asr_demo.py {id: 1, path: wav/audio1.wav, result: {payload_msg: {addition: {duration: 1380, logid: 20260412204314AEA058073860379BBC1A, split_time: []}, code: 1000, message: Success, reqid: b52e33f1-ba2d-4a99-859c-d8007b841bd1, result: [{confidence: 0, text: 头抬高}], sequence: -2}, payload_size: 228}} ASR结果 头抬高这个音频文件是我自己 用手机录了三秒录音默认是2声道wav格式然后传到电脑上的。这些文件后面也要拷贝到SD卡里用于转换后的C 代码使用。ESP32-S3-CAM豆包语音识别文字后控制小车三——SD卡本地音频识别转文字