in functions/source/amazon-chime-recordandtranscribe/src/main/java/com/amazonaws/kvstranscribestreaming/handler/KVSTranscribeStreamingHandler.java [141:218]
private void startKVSToTranscribeStreaming(StreamingStatusStartedDetail detail) throws Exception {
final String transactionId = detail.getTransactionId();
final String callId = detail.getCallId();
final String streamArn = detail.getStreamArn();
final String startFragmentNumber = detail.getStartFragmentNumber();
final String startTime = detail.getStartTime();
Path saveAudioFilePath = Paths.get("/tmp",
transactionId + "_" + callId + "_" + DATE_FORMAT.format(new Date()) + ".raw");
FileOutputStream fileOutputStream = new FileOutputStream(saveAudioFilePath.toString());
InputStream kvsInputStream = KVSUtils.getInputStreamFromKVS(streamArn, REGION, startFragmentNumber,
getAWSCredentials());
StreamingMkvReader streamingMkvReader = StreamingMkvReader
.createDefault(new InputStreamParserByteSource(kvsInputStream));
KVSTransactionIdTagProcessor tagProcessor = new KVSTransactionIdTagProcessor(transactionId);
FragmentMetadataVisitor fragmentVisitor = FragmentMetadataVisitor.create(Optional.of(tagProcessor));
if (Boolean.parseBoolean(IS_TRANSCRIBE_ENABLED)) {
try (TranscribeStreamingRetryClient client = new TranscribeStreamingRetryClient(getTranscribeCredentials(),
TRANSCRIBE_ENDPOINT, REGION, metricsUtil)) {
logger.info("Calling Transcribe service..");
List<TranscriptionPublisher> publishers = Arrays.asList(new DynamoDBTranscriptionPublisher(detail, dynamoDB, CONSOLE_LOG_TRANSCRIPT_FLAG, DDB_OUT_NAME, SQS_ENDPOINT, sqs));
CompletableFuture<Void> result = client.startStreamTranscription(
// since we're definitely working with telephony audio, we know that's 8 kHz
getRequest(8000),
new KVSAudioStreamPublisher(streamingMkvReader, transactionId, fileOutputStream, tagProcessor,
fragmentVisitor, this.shouldWriteAudioToFile),
new StreamTranscriptionBehaviorImpl(publishers));
// There is no timeout limit for transcription running on ECS. Since Lambda doesn't support function with more than 15 mins
// Set up a timeout here so that there is enough time for the audio to be uploaded in S3 before function got destoryed.
if(this.platform.equals(Platform.ECS)) {
result.get();
} else if(this.platform.equals(Platform.LAMBDA)){
result.get(LAMBDA_RECORDING_TIMEOUT_IN_SECOND, TimeUnit.SECONDS);
}
} catch (TimeoutException e) {
logger.debug("Timing out KVS to Transcribe Streaming after 900 sec");
} catch (Exception e) {
logger.error("Error during streaming: ", e);
throw e;
} finally {
if (this.shouldWriteAudioToFile) {
closeFileAndUploadRawAudio(kvsInputStream, fileOutputStream, saveAudioFilePath, transactionId, startTime);
}
}
} else {
try {
logger.info("Transcribe is not enabled; saving audio bytes to location");
while(true)
{
ByteBuffer outputBuffer = KVSUtils.getByteBufferFromStream(streamingMkvReader, fragmentVisitor, tagProcessor,
CHUNK_SIZE_IN_KB);
if (outputBuffer.remaining() > 0) {
//Write audioBytes to a temporary file as they are received from the stream
byte[] audioBytes = new byte[outputBuffer.remaining()];
outputBuffer.get(audioBytes);
fileOutputStream.write(audioBytes);
} else {
break;
}
}
} finally {
closeFileAndUploadRawAudio(kvsInputStream, fileOutputStream, saveAudioFilePath, transactionId, startTime);
}
}
}