in src/main/java/com/aws/iot/edgeconnectorforkvs/videouploader/VideoUploaderClient.java [198:247]
private void doUploadStream(InputStream inputStream, Date videoUploadingStartTime, Runnable statusChangedCallBack,
UploadCallBack uploadCallBack) throws KvsStreamingException {
if (dataEndpoint == null) {
dataEndpoint = getDataEndpoint();
}
putMediaLatch = new CountDownLatch(1);
PutMediaAckResponseHandler rspHandler = createResponseHandler(putMediaLatch, statusChangedCallBack,
uploadCallBack);
if (kvsDataClient == null) {
kvsDataClient = AmazonKinesisVideoPutMediaClient.builder()
.withRegion(region.getName())
.withEndpoint(URI.create(dataEndpoint))
.withCredentials(awsCredentialsProvider)
.withConnectionTimeoutInMillis(CONNECTION_TIMEOUT_IN_MILLIS)
.withNumberOfThreads(1)
.build();
}
log.info("Uploading from input stream, timestamp: " + videoUploadingStartTime.getTime());
kvsDataClient.putMedia(new PutMediaRequest()
.withStreamName(kvsStreamName)
.withFragmentTimecodeType(FragmentTimecodeType.RELATIVE)
.withPayload(inputStream)
.withProducerStartTimestamp(videoUploadingStartTime),
rspHandler);
try {
putMediaLatch.await();
log.info("putMedia end from latch");
} catch (InterruptedException e) {
log.debug("Put media is interrupted");
}
if (lastKvsStreamingException == null && isTaskTerminating) {
/* It's ending from close request, let's wait a little to receive ACKs. */
try {
inputStream.close();
Thread.sleep(Constants.UPLOADER_WAIT_FOR_ACKS_DELAY_MILLI_SECONDS);
} catch (IOException exception) {
log.error(exception.getMessage());
} catch (InterruptedException exception) {
log.error(exception.getMessage());
}
}
kvsDataClient.close();
kvsDataClient = null;
}