private void initMQTTSubscription()

in src/main/java/com/aws/iot/edgeconnectorforkvs/EdgeConnectorForKVSService.java [686:742]


    private void initMQTTSubscription() {
        edgeConnectorForKVSConfigurationList
                .forEach(configuration -> {
                    VideoUploadRequestEvent event = new VideoUploadRequestEvent() {
                        @Override
                        public void onStart(boolean isLive, long updateTimestamp, long startTime, long endTime) {
                            if (isLive) {
                                log.info("Received Live Streaming request for stream: "
                                        + configuration.getKinesisVideoStreamName());
                                ScheduledFuture<?> future = configuration.getStopLiveStreamingTaskFuture();
                                if (future == null) {
                                    // Kick-off Live Streaming for 5 mins
                                    // do it only for the first request
                                    log.info("Start Live Streaming");
                                    liveStreamingExecutor.submit(() -> {
                                        try {
                                            startLiveVideoStreaming(configuration);
                                        } catch (Exception ex) {
                                            log.error("Error starting live video streaming." + ex.getMessage());
                                            Constants.setFatalStatus(true);
                                        }
                                    });
                                } else {
                                    log.info("Live Streaming was already started. Continue Streaming.");
                                    // Cancel the previously started scheduled task
                                    // and restart the task below
                                    future.cancel(false);
                                }
                                Runnable task = getStopLiveStreamingTask(configuration);
                                future = stopLiveStreamingExecutor.schedule(task,
                                        LIVE_STREAMING_STOP_TIMER_DELAY_IN_SECONDS, TimeUnit.SECONDS);
                                configuration.setStopLiveStreamingTaskFuture(future);
                                log.info("Schedule Live Streaming to stop after " +
                                        LIVE_STREAMING_STOP_TIMER_DELAY_IN_SECONDS + "s for stream: " +
                                        configuration.getKinesisVideoStreamName());
                            } else {
                                try {
                                    startHistoricalVideoUploading(configuration, startTime, endTime);
                                } catch (Exception ex) {
                                    log.error("Error starting historical video uploading." + ex.getMessage());
                                    Constants.setFatalStatus(true);
                                }
                            }
                        }

                        @Override
                        public void onError(String errMessage) {
                            log.info("MQTT Error " + errMessage + " for stream "
                                    + configuration.getKinesisVideoStreamName());
                        }
                    };
                    if (configuration.getVideoUploadRequestMqttTopic() != null) {
                        videoUploadRequestHandler.subscribeToMqttTopic(configuration.getVideoUploadRequestMqttTopic(),
                                event);
                    }
                });
    }