private void doUploadStream()

in src/main/java/com/aws/iot/edgeconnectorforkvs/videouploader/VideoUploaderClient.java [204:253]


    private void doUploadStream(InputStream inputStream, Date videoUploadingStartTime, Runnable statusChangedCallBack,
                                UploadCallBack uploadCallBack) throws KvsStreamingException {
        if (dataEndpoint == null) {
            dataEndpoint = getDataEndpoint();
        }

        putMediaLatch = new CountDownLatch(1);
        PutMediaAckResponseHandler rspHandler = createResponseHandler(putMediaLatch, statusChangedCallBack,
                uploadCallBack);

        if (kvsDataClient == null) {
            kvsDataClient = AmazonKinesisVideoPutMediaClient.builder()
                    .withRegion(region.getName())
                    .withEndpoint(URI.create(dataEndpoint))
                    .withCredentials(awsCredentialsProvider)
                    .withConnectionTimeoutInMillis(CONNECTION_TIMEOUT_IN_MILLIS)
                    .withNumberOfThreads(1)
                    .build();
        }

        log.info("Uploading from input stream, timestamp: " + videoUploadingStartTime.getTime());
        kvsDataClient.putMedia(new PutMediaRequest()
                        .withStreamName(kvsStreamName)
                        .withFragmentTimecodeType(FragmentTimecodeType.RELATIVE)
                        .withPayload(inputStream)
                        .withProducerStartTimestamp(videoUploadingStartTime),
                rspHandler);

        try {
            putMediaLatch.await();
            log.info("putMedia end from latch");
        } catch (InterruptedException e) {
            log.debug("Put media is interrupted");
        }

        if (lastKvsStreamingException == null && isTaskTerminating) {
            /* It's ending from close request, let's wait a little to receive ACKs. */
            try {
                inputStream.close();
                Thread.sleep(Constants.UPLOADER_WAIT_FOR_ACKS_DELAY_MILLI_SECONDS);
            } catch (IOException exception) {
                log.error(exception.getMessage());
            } catch (InterruptedException exception) {
                log.error(exception.getMessage());
            }
        }

        kvsDataClient.close();
        kvsDataClient = null;
    }