public void subscribeToMqttTopic()

in src/main/java/com/aws/iot/edgeconnectorforkvs/handler/VideoUploadRequestHandler.java [90:173]


    public void subscribeToMqttTopic(String mqttTopic, VideoUploadRequestEvent event) {
        StreamResponseHandler<IoTCoreMessage> streamResponseHandler;
        streamResponseHandler = new StreamResponseHandler<IoTCoreMessage>() {
            @Override
            public void onStreamEvent(IoTCoreMessage ioTCoreMessage) {
                log.info("onStreamEvent");
                MQTTMessage mqttMessage = ioTCoreMessage.getMessage();
                if (mqttMessage == null) {
                    log.error("Empty MQTT Message Received");
                    return;
                }
                String payload = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
                VideoUploadRequestMessage videoUploadRequestMessage = JSONUtils
                        .jsonToVideoUplaodRequestMessage(payload);

                List<AssetPropertyValue> propertyValueEntry = videoUploadRequestMessage.getPayload().getValues();
                if (propertyValueEntry.size() == 1) {
                    AssetPropertyValue assetPropertyValue = propertyValueEntry.get(0);
                    long propertyUpdateTimestamp = assetPropertyValue.timestamp().timeInSeconds();
                    String value = assetPropertyValue.value().stringValue();
                    long startTimestamp = 0;
                    long endTimestamp = 0;
                    boolean isLive = false;
                    if (value.equalsIgnoreCase(MQTT_LIVE_VIDEO_UPLOAD_REQUEST_KEY)) {
                        // Live Uploading Request
                        isLive = true;
                        log.info("Live Streaming Request Received");
                    } else {
                        // On-Demand Video Uploading Request
                        String[] timestamps = value.split("-");
                        if (timestamps.length == 2) {
                            try {
                                startTimestamp = Long.parseLong(timestamps[0]);
                                endTimestamp = Long.parseLong(timestamps[1]);
                                log.info("On-Demand Streaming Request Received for Time Range "
                                        + timestamps[0] + "-" + timestamps[1]);
                            } catch (NumberFormatException ex) {
                                log.error("Invalid VideoUploadRequest Event Received. " + ex.getMessage());
                            }
                        } else {
                            log.error("Invalid VideoUploadRequest Event Received. Single hyphen required");
                        }
                    }
                    event.onStart(isLive, propertyUpdateTimestamp, startTimestamp, endTimestamp);
                } else {
                    log.error("Invalid VideoUploadRequest Event Received. Single value required");
                }
            }

            @Override
            public boolean onStreamError(Throwable throwable) {
                log.info("onStream Error: " + throwable.getMessage());
                event.onError(throwable.getMessage());
                // Handle error.
                return false;
            }

            @Override
            public void onStreamClosed() {
                log.info("onStream Closed Called");
            }
        };
        try {
            SubscribeToIoTCoreRequest subscribeToIoTCoreRequest = new SubscribeToIoTCoreRequest();
            subscribeToIoTCoreRequest.setTopicName(mqttTopic);
            subscribeToIoTCoreRequest.setQos(QOS.AT_MOST_ONCE);
            SubscribeToIoTCoreResponseHandler operationResponseHandler = greengrassCoreIPCClient
                    .subscribeToIoTCore(subscribeToIoTCoreRequest, Optional.of(streamResponseHandler));
            SubscribeToIoTCoreResponse resp = operationResponseHandler.getResponse().get();
            log.info("Subscribe to MQTT Response: " + resp.toString());
        } catch (ExecutionException ex) {
            final String errorMessage = String.format("Could not Subscribe to MQTT topic %s. %s", mqttTopic,
                    ex.getMessage());
            log.error(errorMessage);
            throw new EdgeConnectorForKVSException(errorMessage, ex);
        } catch (InterruptedException ex) {
            final String errorMessage = String.format("Could not Subscribe to MQTT topic %s. %s", mqttTopic,
                    ex.getMessage());
            log.error(errorMessage);
            // restore interrupted state
            Thread.currentThread().interrupt();
            throw new EdgeConnectorForKVSException(errorMessage, ex);
        }
    }