2023年7月10日发(作者:)
Python讯飞语⾳转⽂字保存到⽂件因为有朋友需要将录⾳转成⽂字, 给我的是m4a格式, 我给转成txt发给他的.我找了找, 发现⽹上很多都是收费软件,⽽且转换结果不尽⼈意., 最后决定使⽤讯飞服务来完成转换, 讯飞语⾳转写api⽀持⼤⽂件, 转换结果也⼗分准确, 感谢讯飞及制作demo的⼤佬yanmeng2, 嘻嘻1先注册账号, 获取key…2 修改代码, 将值输⼊并确定,3 执⾏打开cmd, 输⼊python D:workspacepythonweblfasr_python3_demoweblfasr_python3_执⾏完毕后, 就可以在 代码设置的to_file中看到结果了源码:# -*- coding: utf-8 -*-#
# author: yanmeng2#
#
⾮实时转写调⽤demoimport base64import hashlibimport hmacimport jsonimport osimport timeimport requestslfasr_host = '/api'#
请求的接⼝名api_prepare = '/prepare'api_upload = '/upload'api_merge = '/merge'api_get_progress = '/getProgress'api_get_result = '/getResult'#
⽂件分⽚⼤⼩10Mfile_piece_sice = 10485760# ——————————————————转写可配置参数————————————————#
参数可在官⽹界⾯(/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%)查看,根据需求可⾃⾏在gene_params⽅法⾥添加修改#
转写类型lfasr_type = 0#
是否开启分词has_participle = 'false'has_seperate = 'true'#
多候选词个数max_alternatives = 0#
⼦⽤户标识suid = ''class SliceIdGenerator: """slice id⽣成器""" def __init__(self): self.__ch = 'aaaaaaaaa`' def getNextSliceId(self): ch = self.__ch j = len(ch) - 1 while j >= 0: cj = ch[j] if cj != 'z': ch = ch[:j] + chr(ord(cj) + 1) + ch[j + 1:] break else: ch = ch[:j] + 'a' + ch[j + 1:] j = j - 1 self.__ch = ch return self.__chclass RequestApi(object): def __init__(self, appid, secret_key, upload_file_path, to_file): = appid _key = secret_key _file_path = upload_file_path _file = to_file #
根据不同的apiname⽣成不同的参数,本⽰例中未使⽤全部参数您可在官⽹(/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%)查看后选择适合业务场景的进⾏更换 def gene_params(self, apiname, taskid=None, slice_id=None): appid = secret_key = _key upload_file_path = _file_path ts = str(int(())) m2 = 5() ((appid + ts).encode('utf-8')) md5 = est() md5 = bytes(md5, encoding='utf-8') #
以secret_key为key,
上⾯的md5为msg,
使⽤1加密结果为signa signa = (secret_('utf-8'), md5, 1).digest() signa = base64.b64encode(signa) signa = str(signa, 'utf-8') file_len = e(upload_file_path) file_name = me(upload_file_path) param_dict = {} if apiname == api_prepare: # slice_num是指分⽚数量,如果您使⽤的⾳频都是较短⾳频也可以不分⽚,直接将slice_num指定为1即可 slice_num = int(file_len / file_piece_sice) + (0 if (file_len % file_piece_sice == 0) else 1) param_dict['app_id'] = appid param_dict['signa'] = signa param_dict['ts'] = ts param_dict['file_len'] = str(file_len) param_dict['file_name'] = file_name param_dict['slice_num'] = str(slice_num) elif apiname == api_upload: param_dict['app_id'] = appid param_dict['signa'] = signa param_dict['ts'] = ts param_dict['task_id'] = taskid param_dict['slice_id'] = slice_id elif apiname == api_merge: param_dict['app_id'] = appid param_dict['signa'] = signa param_dict['ts'] = ts param_dict['task_id'] = taskid param_dict['file_name'] = file_name elif apiname == api_get_progress or apiname == api_get_result: elif apiname == api_get_progress or apiname == api_get_result: param_dict['app_id'] = appid param_dict['signa'] = signa param_dict['ts'] = ts param_dict['task_id'] = taskid return param_dict #
请求和结果解析,结果中各个字段的含义可参考:/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86% def gene_request(self, apiname, data, files=None, headers=None): response = (lfasr_host + apiname, data=data, files=files, headers=headers) result = () if result["ok"] == 0: print("{} success:".format(apiname) + str(result)) return result else: print("{} error:".format(apiname) + str(result)) exit(0) return result #
预处理 def prepare_request(self): return _request(apiname=api_prepare, data=_params(api_prepare)) #
上传 def upload_request(self, taskid, upload_file_path): file_object = open(upload_file_path, 'rb') try: index = 1 sig = SliceIdGenerator() while True: content = file_(file_piece_sice) if not content or len(content) == 0: break files = { "filename": _params(api_upload).get("slice_id"), "content": content } response = _request(api_upload, data=_params(api_upload, taskid=taskid, slice_id=tSliceId()), files=files) if ('ok') != 0: #
上传分⽚失败 print('upload slice fail, response: ' + str(response)) return False print('upload slice ' + str(index) + ' success') index += 1 finally: 'file index:' + str(file_()) file_() return True #
合并 def merge_request(self, taskid): return _request(api_merge, data=_params(api_merge, taskid=taskid)) #
获取进度 def get_progress_request(self, taskid): return _request(api_get_progress, data=_params(api_get_progress, taskid=taskid)) #
获取结果 def get_result_request(self, taskid): return _request(api_get_result, data=_params(api_get_result, taskid=taskid)) def all_api_request(self): def all_api_request(self): # 1.
预处理 pre_result = e_request() taskid = pre_result["data"] # 2 .
分⽚上传 _request(taskid=taskid, upload_file_path=_file_path) # 3 .
⽂件合并 _request(taskid=taskid) # 4 .
获取任务进度 while True: #
每隔20秒获取⼀次任务进度 progress = _progress_request(taskid) progress_dic = progress if progress_dic['err_no'] != 0 and progress_dic['err_no'] != 26605: print('task error: ' + progress_dic['failed']) return else: data = progress_dic['data'] task_status = (data) if task_status['status'] == 9: print('task ' + taskid + ' finished') break print('The task ' + taskid + ' is in processing, task status: ' + str(data)) #
每次获取进度间隔20S (20) # 5 .
获取结果 res = _result_request(taskid=taskid) r = (res["data"]) with open(_file,'w', encoding='utf-8') as file_obj: for iterating_var in r: file_(iterating_var["onebest"]) file_("n")#
注意:如果出现requests模块报错:"NoneType" object has no attribute 'read',
请尝试将requests模块更新到2.20.0或以上版本(本demo测试版本为2.20.0)#
输⼊讯飞开放平台的appid,secret_key和待转写的⽂件路径if __name__ == '__main__': api = RequestApi( appid="xxx", secret_key="xxx", upload_file_path=r"C:/Users/DELL/Desktop/录⾳⽂件/20201104_163233.m4a", to_file=r"C:/Users/DELL/Desktop/录⾳⽂件/20201104_" ) _api_request()
发布者:admin,转转请注明出处:http://www.yc00.com/news/1688931569a184830.html
评论列表(0条)