recipes/data/fisher/utils.py (116 lines of code) (raw):

""" Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. This source code is licensed under the BSD-style license found in the LICENSE file in the root directory of this source tree. """ from __future__ import absolute_import, division, print_function, unicode_literals import os import sox def find_files(src): src_dirs = src.split(",") required_dirs = [ "fe_03_p1_sph1", "fe_03_p1_sph3", "fe_03_p1_sph5", "fe_03_p1_sph7", "fe_03_p2_sph1", "fe_03_p2_sph3", "fe_03_p2_sph5", "fe_03_p2_sph7", "fe_03_p1_sph2", "fe_03_p1_sph4", "fe_03_p1_sph6", "fe_03_p2_sph2", "fe_03_p2_sph4", "fe_03_p2_sph6", "fe_03_p1_tran", "fe_03_p2_tran", ] dir_mapping = {} for dir in src_dirs: for curdir in os.listdir(dir): fulldir = os.path.join(dir, curdir) if not os.path.isdir(fulldir): continue for req_dir in required_dirs: new_style_req_dir = req_dir.replace( "fe_03_p1_sph", "fisher_eng_tr_sp_d" ) if curdir == req_dir or curdir == new_style_req_dir: dir_mapping[req_dir] = fulldir continue transcript_files = {} audio_files = {} for dir in required_dirs: assert dir in dir_mapping, "could not find the subdirectory {}".format(dir) fulldir = dir_mapping[dir] if "tran" in fulldir: fulldir = os.path.join(fulldir, "data") for dirpath, _, filenames in os.walk(fulldir): for filename in filenames: key = filename.split(".")[0] if filename.startswith("fe_") and filename.endswith(".txt"): transcript_files[key] = os.path.join(dirpath, filename) elif filename.endswith(".sph"): audio_files[key] = os.path.join(dirpath, filename) return [(audio_files[k], transcript_files[k]) for k in audio_files] def process_fisher_data(sample_data): files, _, audio_path, sph2pipe = sample_data sphfile, tfile = files tmp_files = {} for channel in ["A", "B"]: tmp_files[channel] = os.path.join( audio_path, "{pid}_tmp_{ch}.wav".format(pid=os.getpid(), ch=channel) ) os.system( "{sph} -f wav -c {c} {i} {o}".format( sph=sph2pipe, c=1 if channel == "A" else 2, i=sphfile, o=tmp_files[channel], ) ) idx = 0 lines = [] with open(tfile, "r") as f: first_line = f.readline().strip() assert first_line.startswith("#") and first_line.endswith(".sph") audiofileid = first_line.replace("#", "").replace(".sph", "").strip() cur_audio_path = os.path.join(audio_path, audiofileid) os.makedirs(cur_audio_path, exist_ok=True) for line in f: if line.startswith("#") or not line.strip(): continue tag, text = line.strip().split(":", 1) start, end, channel = tag.split() start = float(start) end = float(end) utt = "{a}-{c}-{s}-{e}".format( a=audiofileid, c=channel, s="{:06d}".format(int(start * 100 + 0.5)), e="{:06d}".format(int(end * 100 + 0.5)), ) # ignore uncertain annotations if "((" in text: continue # lower-case text = text.lower() # remove punctuation text = text.replace("?", "") text = text.replace(",", "") # simplify noise annotations text = text.replace("[[skip]]", "") text = text.replace("[pause]", "") text = text.replace("[laugh]", "[laughter]") text = text.replace("[sigh]", "[noise]") text = text.replace("[cough]", "[noise]") text = text.replace("[mn]", "[noise]") text = text.replace("[breath]", "[noise]") text = text.replace("[lipsmack]", "[noise]") text = text.replace("[sneeze]", "[noise]") text = " ".join(text.split()) out_file = os.path.join(cur_audio_path, "{:09d}.flac".format(idx)) sox_tfm = sox.Transformer() sox_tfm.set_output_format( file_type="flac", encoding="signed-integer", bits=16 ) sox_tfm.trim(start, end) sox_tfm.build(tmp_files[channel], out_file) duration = (end - start) * 1000.0 idx = idx + 1 lines.append("\t".join([utt, out_file, "{0:.2f}".format(duration), text])) # cleanup for tmp in tmp_files.values(): os.remove(tmp) return lines