in src/main/java/com/aws/iot/edgeconnectorforkvs/EdgeConnectorForKVSService.java [686:742]
private void initMQTTSubscription() {
edgeConnectorForKVSConfigurationList
.forEach(configuration -> {
VideoUploadRequestEvent event = new VideoUploadRequestEvent() {
@Override
public void onStart(boolean isLive, long updateTimestamp, long startTime, long endTime) {
if (isLive) {
log.info("Received Live Streaming request for stream: "
+ configuration.getKinesisVideoStreamName());
ScheduledFuture<?> future = configuration.getStopLiveStreamingTaskFuture();
if (future == null) {
// Kick-off Live Streaming for 5 mins
// do it only for the first request
log.info("Start Live Streaming");
liveStreamingExecutor.submit(() -> {
try {
startLiveVideoStreaming(configuration);
} catch (Exception ex) {
log.error("Error starting live video streaming." + ex.getMessage());
Constants.setFatalStatus(true);
}
});
} else {
log.info("Live Streaming was already started. Continue Streaming.");
// Cancel the previously started scheduled task
// and restart the task below
future.cancel(false);
}
Runnable task = getStopLiveStreamingTask(configuration);
future = stopLiveStreamingExecutor.schedule(task,
LIVE_STREAMING_STOP_TIMER_DELAY_IN_SECONDS, TimeUnit.SECONDS);
configuration.setStopLiveStreamingTaskFuture(future);
log.info("Schedule Live Streaming to stop after " +
LIVE_STREAMING_STOP_TIMER_DELAY_IN_SECONDS + "s for stream: " +
configuration.getKinesisVideoStreamName());
} else {
try {
startHistoricalVideoUploading(configuration, startTime, endTime);
} catch (Exception ex) {
log.error("Error starting historical video uploading." + ex.getMessage());
Constants.setFatalStatus(true);
}
}
}
@Override
public void onError(String errMessage) {
log.info("MQTT Error " + errMessage + " for stream "
+ configuration.getKinesisVideoStreamName());
}
};
if (configuration.getVideoUploadRequestMqttTopic() != null) {
videoUploadRequestHandler.subscribeToMqttTopic(configuration.getVideoUploadRequestMqttTopic(),
event);
}
});
}