in src/process_transcription_full_text.py [0:0]
def chunk_up_transcript(custom_vocabs, results):
# Here is the JSON returned by the Amazon Transcription SDK
# {
# "jobName":"JobName",
# "accountId":"Your AWS Account Id",
# "results":{
# "transcripts":[
# {
# "transcript":"ah ... this is the text of the transcript"
# }
# ],
# "speaker_labels": {
# "speakers": 2,
# "segments": [
# {
# "start_time": "0.0",
# "speaker_label": "spk_1",
# "end_time": "23.84",
# "items": [
# {
# "start_time": "23.84",
# "speaker_label": "spk_0",
# "end_time": "24.87",
# "items": [
# {
# "start_time": "24.063",
# "speaker_label": "spk_0",
# "end_time": "24.273"
# },
# {
# "start_time": "24.763",
# "speaker_label": "spk_0",
# "end_time": "25.023"
# }
# ]
# }
# ]
# ]
# },
# "items":[
# {
# "start_time":"0.630",
# "end_time":"5.620",
# "alternatives": [
# {
# "confidence":"0.7417",
# "content":"ah"
# }
# ],
# "type":"pronunciation"
# }
# ]
# }
speaker_label_exist = False
speaker_segments = None
if 'speaker_labels' in results:
speaker_label_exist = True
speaker_segments = parse_speaker_segments(results)
items = results['items']
last_speaker = None
paragraphs = []
current_paragraph = ""
comprehend_chunks = []
current_comprehend_chunk = ""
previous_time = 0
last_pause = 0
last_item_was_sentence_end = False
for item in items:
if item["type"] == "pronunciation":
start_time = float(item['start_time'])
if speaker_label_exist:
current_speaker = get_speaker_label(speaker_segments, float(item['start_time']))
if last_speaker is None or current_speaker != last_speaker:
if current_paragraph is not None:
paragraphs.append(current_paragraph)
current_paragraph = current_speaker + " :"
last_pause = start_time
last_speaker = current_speaker
elif (start_time - previous_time) > 2 or (
(start_time - last_pause) > 15 and last_item_was_sentence_end):
last_pause = start_time
if current_paragraph is not None or current_paragraph != "":
paragraphs.append(current_paragraph)
current_paragraph = ""
phrase = item['alternatives'][0]['content']
if custom_vocabs is not None:
if phrase in custom_vocabs:
phrase = custom_vocabs[phrase]
logger.info("replaced custom vocab: " + phrase)
if phrase in commonDict:
phrase = commonDict[phrase]
current_paragraph += " " + phrase
# add chunking
current_comprehend_chunk += " " + phrase
last_item_was_sentence_end = False
elif item["type"] == "punctuation":
current_paragraph += item['alternatives'][0]['content']
current_comprehend_chunk += item['alternatives'][0]['content']
if item['alternatives'][0]['content'] in (".", "!", "?"):
last_item_was_sentence_end = True
else:
last_item_was_sentence_end = False
if (item["type"] == "punctuation" and len(current_comprehend_chunk) >= 4500) \
or len(current_comprehend_chunk) > 4900:
comprehend_chunks.append(current_comprehend_chunk)
current_comprehend_chunk = ""
if 'end_time' in item:
previous_time = float(item['end_time'])
if not current_comprehend_chunk == "":
comprehend_chunks.append(current_comprehend_chunk)
if not current_paragraph == "":
paragraphs.append(current_paragraph)
logger.debug(json.dumps(paragraphs, indent=4))
logger.debug(json.dumps(comprehend_chunks, indent=4))
return comprehend_chunks, "\n\n".join(paragraphs)