private void startKVSToTranscribeStreaming()

in src/main/java/com/amazonaws/kvstranscribestreaming/handler/KVSTranscribeStreamingHandler.java [133:211]


    private void startKVSToTranscribeStreaming(StreamingStatusStartedDetail detail) throws Exception {

        final String transactionId = detail.getTransactionId();
        final String callId = detail.getCallId();
        final String streamArn = detail.getStreamArn();
        final String startFragmentNumber = detail.getStartFragmentNumber();
        final String startTime = detail.getStartTime();


        Path saveAudioFilePath = Paths.get("/tmp",
                transactionId + "_" + DATE_FORMAT.format(new Date()) + ".raw");
        FileOutputStream fileOutputStream = new FileOutputStream(saveAudioFilePath.toString());

        InputStream kvsInputStream = KVSUtils.getInputStreamFromKVS(streamArn, REGION, startFragmentNumber,
                getAWSCredentials());
        StreamingMkvReader streamingMkvReader = StreamingMkvReader
                .createDefault(new InputStreamParserByteSource(kvsInputStream));

        KVSTransactionIdTagProcessor tagProcessor = new KVSTransactionIdTagProcessor(transactionId);
        FragmentMetadataVisitor fragmentVisitor = FragmentMetadataVisitor.create(Optional.of(tagProcessor));

        if (Boolean.parseBoolean(IS_TRANSCRIBE_ENABLED)) {
            try (TranscribeStreamingRetryClient client = new TranscribeStreamingRetryClient(getTranscribeCredentials(),
                    TRANSCRIBE_ENDPOINT, REGION, metricsUtil)) {

                logger.info("Calling Transcribe service..");

                List<TranscriptionPublisher> publishers = Arrays.asList(new WebSocketTranscriptionPublisher(dynamoDB, detail, getAWSCredentials()),
                        new DynamoDBTranscriptionPublisher(detail, dynamoDB, CONSOLE_LOG_TRANSCRIPT_FLAG));

                CompletableFuture<Void> result = client.startStreamTranscription(
                        // since we're definitely working with telephony audio, we know that's 8 kHz
                        getRequest(8000),
                        new KVSAudioStreamPublisher(streamingMkvReader, transactionId, fileOutputStream, tagProcessor,
                                fragmentVisitor, this.shouldWriteAudioToFile),
                        new StreamTranscriptionBehaviorImpl(publishers));

                // There is no timeout limit for transcription running on ECS. Since Lambda doesn't support function with more than 15 mins
                // Set up a timeout here so that there is enough time for the audio to be uploaded in S3 before function got destoryed.
                if(this.platform.equals(Platform.ECS)) {
                    result.get();
                } else if(this.platform.equals(Platform.LAMBDA)){
                    result.get(LAMBDA_RECORDING_TIMEOUT_IN_SECOND, TimeUnit.SECONDS);
                }
            } catch (TimeoutException e) {
                logger.debug("Timing out KVS to Transcribe Streaming after 900 sec");
            } catch (Exception e) {
                logger.error("[{}] Error during streaming: ", this.transactionId, e);
                throw e;
            } finally {
                // Upload the raw audio regardless of any exception thrown in the middle
                if (this.shouldWriteAudioToFile) {
                    closeFileAndUploadRawAudio(kvsInputStream, fileOutputStream, saveAudioFilePath, transactionId, startTime);
                }
            }
        } else {
            try {
                logger.info("Transcribe is not enabled; saving audio bytes to location");

                while(true)
                {
                    ByteBuffer outputBuffer = KVSUtils.getByteBufferFromStream(streamingMkvReader, fragmentVisitor, tagProcessor,
                            CHUNK_SIZE_IN_KB);

                    if (outputBuffer.remaining() > 0) {
                        //Write audioBytes to a temporary file as they are received from the stream
                        byte[] audioBytes = new byte[outputBuffer.remaining()];
                        outputBuffer.get(audioBytes);
                        fileOutputStream.write(audioBytes);
                    } else {
                        break;
                    }
                }
            } finally {
                // Upload the raw audio regardless of any exception thrown in the middle
                closeFileAndUploadRawAudio(kvsInputStream, fileOutputStream, saveAudioFilePath, transactionId, startTime);
            }
        }
    }