recipes/data/switchboard/utils.py (130 lines of code) (raw):

from __future__ import absolute_import, division, print_function, unicode_literals import os import re import sox def process_hub5_data(sample_data): line, idx, hub5_sdir, hub5_audio_path, sph2pipe = sample_data if (not line) or line.startswith(";;") or ("IGNORE_TIME_SEGMENT_" in line): return None parts = line.strip().split() transcript = " ".join(parts[6:]) transcript = transcript.replace("((", "(") transcript = transcript.replace("<B_ASIDE>", "") transcript = transcript.replace("<A_ASIDE>", "") spk = "{}-{}".format(parts[0], parts[1]) start = float(parts[3]) end = float(parts[4]) utt = "{u}_{s}-{e}".format( u=spk, s="{:06d}".format(int(start * 100)), e="{:06d}".format(int(end * 100)) ) in_file = os.path.join(hub5_sdir, "english", parts[0] + ".sph") out_file = os.path.join(hub5_audio_path, "{:09d}.flac".format(idx)) tmp_file = os.path.join(hub5_audio_path, "{pid}_tmp.wav".format(pid=os.getpid())) os.system( "{sph} -f wav -c {c} {i} {o}".format( sph=sph2pipe, c=1 if parts[1] == "A" else 2, i=in_file, o=tmp_file ) ) assert ( sox.file_info.duration(tmp_file) > 0 ), "Audio file {} duration is zero.".format(in_file) sox_tfm = sox.Transformer() sox_tfm.set_output_format(file_type="flac", encoding="signed-integer", bits=16) sox_tfm.trim(start, end) sox_tfm.build(tmp_file, out_file) os.remove(tmp_file) duration = (end - start) * 1000.0 return "\t".join([utt, out_file, "{0:.2f}".format(duration), transcript.lower()]) def normalize_acronyms(line, acronym_dict): # Taken from https://git.io/fjhbu # Original Author - Minhua Wu dict_acronym = {} dict_acronym_noi = {} # Mapping of acronyms without I, i for k, v in acronym_dict.items(): dict_acronym[k] = v.strip() dict_acronym_noi[k] = v.strip() del dict_acronym_noi["i"] del dict_acronym_noi["I"] line = "<dummy-id> " + line.strip() items = line.split() L = len(items) # First pass mapping to map I as part of acronym for i in range(L): if items[i] == "i": x = 0 while i - 1 - x >= 0 and re.match(r"^[A-Z]$", items[i - 1 - x]): x += 1 y = 0 while i + 1 + y < L and re.match(r"^[A-Z]$", items[i + 1 + y]): y += 1 if x + y > 0: for bias in range(-x, y + 1): items[i + bias] = dict_acronym[items[i + bias]] # Second pass mapping (not mapping 'i' and 'I') for i in range(len(items)): if items[i] in dict_acronym_noi.keys(): items[i] = dict_acronym_noi[items[i]] return " ".join(items[1:]) def sanitize(transcript, acronym_dict): cleaned_words = "" for word in transcript.split(): # Remove silence word = word.replace("[silence]", "") # Remove <b_aside>, <e_aside> (background conversation indicators) word = word.replace("<b_aside>", "") word = word.replace("<e_aside>", "") # Use special noise symbol for [vocalized-noise]. # NOTE: Kaldi doesn't do this word = word.replace("[vocalized-noise]", "[noise]") # For word containing laughter, replace [laughter-word] by word # (these word are still properly understood) # also handle cases like [laughter-ou[r]-] word = re.sub(r"(-?)\[laughter\-([\S]+)\](-?)", r"\1\2\3", word) # for anomalous word like [Bamorghini/Lamborghini], we consider the first # word as it matches more with the pronounciation word = re.sub(r"\[(\S+)\/\S+\]", r"\1", word) # handle an incorrect input: 'ex[specially]-/especially]' word = re.sub("ex.specially...especially.", "ex-", word) # For partial word like -[Substi]tute use '-tute' in word transcription word = re.sub(r"ammu\[n\]it", r"ammu-it", word) # handle case 'ammu[n]it[ion]-' word = re.sub(r"\-\[[^\]\s]+\]", r"-", word) word = re.sub(r"\[[^\[\s]+\]\-", r"-", word) # for coinages like {DJed}, {yuppyish} remove curly braces around them word = re.sub(r"[\{\}]+", r"", word) # For common alternate pronunciations like about_1 -> b aw t, them_1 eh m, # remove '_1' word = re.sub(r"_\d$", r"", word) word = re.sub(r"them_1's", r"them's", word) # handle case 'them_1's' cleaned_words += word + " " # Normalize acronyms to Fisher format BBC -> b._b._c. return normalize_acronyms(cleaned_words, acronym_dict) def process_swbd_data(sample_data): data, _, swbd_audio_path, sph2pipe, acronym_dict = sample_data id, sphfile, chA, chB = data tmp_file = os.path.join(swbd_audio_path, "{pid}_tmp.wav".format(pid=os.getpid())) cur_audio_path = os.path.join(swbd_audio_path, id) os.makedirs(cur_audio_path, exist_ok=True) idx = 0 lines = [] for channel in ["A", "B"]: os.system( "{sph} -f wav -c {c} {i} {o}".format( sph=sph2pipe, c=1 if channel == "A" else 2, i=sphfile, o=tmp_file ) ) assert ( sox.file_info.duration(tmp_file) > 0 ), "Audio file {} duration is zero.".format(sphfile) with open(chA if channel == "A" else chB, "r") as f: for line in f: name = line[0:6].replace("sw", "sw0") channel = line[6] splits = line.strip().split(" ", 3) start = float(splits[1]) end = float(splits[2]) transcript = sanitize(splits[3], acronym_dict) if not transcript: continue utt = "{n}-{c}_{s}-{e}".format( n=name, c=channel, s="{:06d}".format(int(start * 100 + 0.5)), e="{:06d}".format(int(end * 100 + 0.5)), ) out_file = os.path.join(cur_audio_path, "{:09d}.flac".format(idx)) sox_tfm = sox.Transformer() sox_tfm.set_output_format( file_type="flac", encoding="signed-integer", bits=16 ) sox_tfm.trim(start, end) sox_tfm.build(tmp_file, out_file) duration = (end - start) * 1000.0 idx = idx + 1 lines.append( "\t".join( [utt, out_file, "{0:.2f}".format(duration), transcript.lower()] ) ) os.remove(tmp_file) return lines