in src/main/java/com/amazonaws/kvstranscribestreaming/KVSTranscribeStreamingLambda.java [140:219]
private void startKVSToTranscribeStreaming(String streamARN, String startFragmentNum, String contactId, boolean transcribeEnabled,
Optional<String> languageCode, Optional<Boolean> saveCallRecording,
boolean isStreamAudioFromCustomerEnabled, boolean isStreamAudioToCustomerEnabled) throws Exception {
String streamName = streamARN.substring(streamARN.indexOf("/") + 1, streamARN.lastIndexOf("/"));
KVSStreamTrackObject kvsStreamTrackObjectFromCustomer = null;
KVSStreamTrackObject kvsStreamTrackObjectToCustomer = null;
if (isStreamAudioFromCustomerEnabled) {
kvsStreamTrackObjectFromCustomer = getKVSStreamTrackObject(streamName, startFragmentNum, KVSUtils.TrackName.AUDIO_FROM_CUSTOMER.getName(), contactId);
}
if (isStreamAudioToCustomerEnabled) {
kvsStreamTrackObjectToCustomer = getKVSStreamTrackObject(streamName, startFragmentNum, KVSUtils.TrackName.AUDIO_TO_CUSTOMER.getName(), contactId);
}
if (transcribeEnabled) {
try (TranscribeStreamingRetryClient client = new TranscribeStreamingRetryClient(getTranscribeCredentials(),
TRANSCRIBE_ENDPOINT, TRANSCRIBE_REGION, metricsUtil)) {
logger.info("Calling Transcribe service..");
CompletableFuture<Void> fromCustomerResult = null;
CompletableFuture<Void> toCustomerResult = null;
if (kvsStreamTrackObjectFromCustomer != null) {
fromCustomerResult = getStartStreamingTranscriptionFuture(kvsStreamTrackObjectFromCustomer,
languageCode, contactId, client, fromCustomerSegmentWriter, TABLE_CALLER_TRANSCRIPT, KVSUtils.TrackName.AUDIO_FROM_CUSTOMER.getName());
}
if (kvsStreamTrackObjectToCustomer != null) {
toCustomerResult = getStartStreamingTranscriptionFuture(kvsStreamTrackObjectToCustomer,
languageCode, contactId, client, toCustomerSegmentWriter, TABLE_CALLER_TRANSCRIPT_TO_CUSTOMER, KVSUtils.TrackName.AUDIO_TO_CUSTOMER.getName());
}
// Synchronous wait for stream to close, and close client connection
// Timeout of 890 seconds because the Lambda function can be run for at most 15 mins (~890 secs)
if (null != fromCustomerResult) {
fromCustomerResult.get(890, TimeUnit.SECONDS);
}
if (null != toCustomerResult) {
toCustomerResult.get(890, TimeUnit.SECONDS);
}
} catch (TimeoutException e) {
logger.debug("Timing out KVS to Transcribe Streaming after 890 sec");
} catch (Exception e) {
logger.error("Error during streaming: ", e);
throw e;
} finally {
if (kvsStreamTrackObjectFromCustomer != null) {
closeFileAndUploadRawAudio(kvsStreamTrackObjectFromCustomer, contactId, saveCallRecording);
}
if (kvsStreamTrackObjectToCustomer != null) {
closeFileAndUploadRawAudio(kvsStreamTrackObjectToCustomer, contactId, saveCallRecording);
}
}
} else {
try {
logger.info("Saving audio bytes to location");
//Write audio bytes from the KVS stream to the temporary file
if (kvsStreamTrackObjectFromCustomer != null) {
writeAudioBytesToKvsStream(kvsStreamTrackObjectFromCustomer, contactId);
}
if (kvsStreamTrackObjectToCustomer != null) {
writeAudioBytesToKvsStream(kvsStreamTrackObjectToCustomer, contactId);
}
} finally {
if (kvsStreamTrackObjectFromCustomer != null) {
closeFileAndUploadRawAudio(kvsStreamTrackObjectFromCustomer, contactId, saveCallRecording);
}
if (kvsStreamTrackObjectToCustomer != null) {
closeFileAndUploadRawAudio(kvsStreamTrackObjectToCustomer, contactId, saveCallRecording);
}
}
}
}