in src/main/java/com/amazonaws/kinesisvideo/internal/service/DefaultServiceCallbacksImpl.java [384:455]
public void putStream(
@Nonnull final String streamName,
@Nonnull final String containerType,
final long streamStartTime,
final boolean absoluteFragmentTimes,
final boolean ackRequired,
@Nonnull final String dataEndpoint,
final long callAfter,
final long timeout,
@Nullable final byte[] authData,
final int authType,
final KinesisVideoProducerStream kinesisVideoProducerStream) throws ProducerException {
Preconditions.checkState(isInitialized(), "Service callbacks object should be initialized first");
final long delay = calculateRelativeServiceCallAfter(callAfter);
final Runnable task = new Runnable() {
@Override
public void run() {
if (kinesisVideoProducerStream == null) {
throw new IllegalStateException("Couldn't find the correct stream");
}
final long timeoutInMillis = timeout / Time.HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
final long streamStartTimeInMillis = streamStartTime / Time.HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
int statusCode = HTTP_OK;
final KinesisVideoCredentialsProvider credentialsProvider = getCredentialsProvider(authData, log);
final long clientUploadHandle = getUploadHandle();
try {
final InputStream dataStream = kinesisVideoProducerStream.getDataStream(clientUploadHandle);
final AckConsumer ackConsumer = new AckConsumer(clientUploadHandle, kinesisVideoProducerStream, log);
final BlockingAckConsumer blockingAckConsumer = new BlockingAckConsumer(ackConsumer, log,
kinesisVideoProducerStream);
final CompletionCallback completionCallback = new CompletionCallback(kinesisVideoProducerStream,
clientUploadHandle);
// This will kick-off a long running operation
kinesisVideoServiceClient.putMedia(streamName,
containerType,
streamStartTimeInMillis,
absoluteFragmentTimes,
ackRequired,
dataEndpoint,
timeoutInMillis,
credentialsProvider,
dataStream,
blockingAckConsumer,
completionCallback);
// Block until we parse the headers
blockingAckConsumer.awaitResponse();
} catch (final KinesisVideoException e) {
statusCode = getStatusCodeFromException(e);
log.error("Kinesis Video service client returned an error " + e.getMessage() + ". Reporting to Kinesis Video PIC.");
}
try {
log.info("putStreamResult uploadHandle " + clientUploadHandle + " status " + statusCode);
kinesisVideoProducer.putStreamResult(kinesisVideoProducerStream, clientUploadHandle, statusCode);
} catch (final ProducerException e) {
throw new RuntimeException(e);
}
}
};
executor.schedule(task, delay, TimeUnit.NANOSECONDS);
}